Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

closes #195 #217

Merged
merged 1 commit into from
Jan 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions FlashCards_Service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,11 @@
</dependencies>
<build>
<finalName>FlashCards_Service</finalName>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* [y] hybris Platform
*
* Copyright (c) 2000-2016 hybris AG
* All rights reserved.
*
* This software is the confidential and proprietary information of hybris
* ("Confidential Information"). You shall not disclose such Confidential
* Information and shall use it only in accordance with the terms of the
* license agreement you entered into with hybris.
*/
package org.robbins.flashcards;

public enum SaveResultStatus {
SUCCESS,
FAILURE
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

import javax.inject.Inject;

import org.robbins.flashcards.akka.actor.BatchSavingCoordinator;
import org.robbins.flashcards.akka.message.BatchSaveResultMessage;
import org.robbins.flashcards.akka.message.BatchSaveStartMessage;
import org.robbins.flashcards.akka.message.Messages;
import org.robbins.flashcards.dto.AbstractAuditableDto;
import org.robbins.flashcards.dto.BatchLoadingReceiptDto;
import org.robbins.flashcards.exceptions.RepositoryException;
Expand Down Expand Up @@ -66,7 +64,7 @@ public BatchLoadingReceiptDto save(final GenericCrudFacade facade, final List<Ab
configureCreatedBy(dtos);

try {
final BatchSaveResultMessage receiptMessage = saveBatchWithAkka(type, facade, dtos);
final Messages.BatchSaveResultMessage receiptMessage = saveBatchWithAkka(type, facade, dtos);

LOGGER.debug("Batch save complete: {}", receiptMessage);
return completeBatchLoadingReceipt(receiptMessage);
Expand All @@ -80,15 +78,15 @@ private void configureCreatedBy(final List<AbstractAuditableDto> dtos) {
dtos.forEach(dto -> DtoAuditingUtil.configureCreatedByAndTime(dto, getAuditingUserId()));
}

private BatchSaveResultMessage saveBatchWithAkka(final String type, final GenericCrudFacade facade,
private Messages.BatchSaveResultMessage saveBatchWithAkka(final String type, final GenericCrudFacade facade,
final List<AbstractAuditableDto> dtos) throws Exception {

final BatchLoadingReceiptDto batchLoadingReceiptDto = createBatchLoadingReceipt(type, dtos);
final BatchSaveStartMessage startBatchSaveMessage = new BatchSaveStartMessage(batchLoadingReceiptDto, facade,
final Messages.BatchSaveStartMessage startBatchSaveMessage = new Messages.BatchSaveStartMessage(batchLoadingReceiptDto, facade,
dtos);
final FiniteDuration duration = FiniteDuration.create(1, TimeUnit.HOURS);
final ClassTag<BatchSaveResultMessage> classTag = Util.classTag(BatchSaveResultMessage.class);
final Future<BatchSaveResultMessage> receiptFuture = ask(batchSavingCoordinator, startBatchSaveMessage,
final ClassTag<Messages.BatchSaveResultMessage> classTag = Util.classTag(Messages.BatchSaveResultMessage.class);
final Future<Messages.BatchSaveResultMessage> receiptFuture = ask(batchSavingCoordinator, startBatchSaveMessage,
Timeout.durationToTimeout(duration))
.mapTo(classTag);
return Await.result(receiptFuture, duration);
Expand All @@ -114,8 +112,8 @@ private BatchLoadingReceiptDto createBatchLoadingReceipt(final String type, fina
// return receipt;
// }

private BatchLoadingReceiptDto completeBatchLoadingReceipt(final BatchSaveResultMessage receiptMessage) {
BatchLoadingReceiptDto receipt = receiptMessage.getReceiptDto();
private BatchLoadingReceiptDto completeBatchLoadingReceipt(final Messages.BatchSaveResultMessage receiptMessage) {
BatchLoadingReceiptDto receipt = receiptMessage.receiptDto();
receipt.setEndTime(new Date());
receipt = facade.save(receipt);
return receipt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import java.util.List;

import org.robbins.flashcards.akka.message.SingleBatchSaveResultMessage;
import org.robbins.flashcards.akka.message.SingleBatchSaveStartMessage;
import org.robbins.flashcards.akka.message.Messages;
import org.robbins.flashcards.dto.AbstractAuditableDto;
import org.robbins.flashcards.dto.AbstractPersistableDto;
import org.robbins.flashcards.dto.BatchLoadingReceiptDto;
import org.robbins.flashcards.facade.base.GenericCrudFacade;
import org.slf4j.Logger;
Expand Down Expand Up @@ -39,35 +37,35 @@ public static Props props() {
public void preStart() throws Exception
{
super.preStart();
context().parent().tell(new BatchSavingCoordinator.GiveMeWork(), self());
context().parent().tell(new Messages.GiveMeWork(), self());
}

@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder
.match(SingleBatchSaveStartMessage.class, startSave -> doSave(startSave, sender()))
.match(Messages.SingleBatchSaveStartMessage.class, startSave -> doSave(startSave, sender()))
.matchAny(o -> LOGGER.error("Received Unknown message"))
.build();
}

private void doSave(final SingleBatchSaveStartMessage startSaveMessage, final ActorRef sender) {
private void doSave(final Messages.SingleBatchSaveStartMessage startSaveMessage, final ActorRef sender) {
LOGGER.trace("Received SingleBatchSaveStartMessage message: {}", startSaveMessage.toString());

this.facade = startSaveMessage.getFacade();
this.batchId = startSaveMessage.getBatchId();
this.facade = startSaveMessage.facade();
this.batchId = startSaveMessage.batchId();

final SingleBatchSaveResultMessage result = saveBatch(startSaveMessage.getDtos());
final Messages.SingleBatchSaveResultMessage result = saveBatch(startSaveMessage.dtos());

LOGGER.trace("Sending SingleBatchSaveResultMessage message: {}", result.toString());
sender.tell(result, self());
}

private SingleBatchSaveResultMessage saveBatch(final List<AbstractAuditableDto> batch) {
private Messages.SingleBatchSaveResultMessage saveBatch(final List<AbstractAuditableDto> batch) {
final BatchLoadingReceiptDto receipt = facade.save(batch);
successCount = receipt.getSuccessCount();
failureCount = getFailureCount(batch.size());

final SingleBatchSaveResultMessage result = new SingleBatchSaveResultMessage(successCount, failureCount, batchId);
final Messages.SingleBatchSaveResultMessage result = new Messages.SingleBatchSaveResultMessage(successCount, failureCount, batchId);
resetCounters();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;

import org.robbins.flashcards.akka.message.BatchSaveResultMessage;
import org.robbins.flashcards.akka.message.BatchSaveStartMessage;
import org.robbins.flashcards.akka.message.SingleBatchSaveResultMessage;
import org.robbins.flashcards.akka.message.SingleBatchSaveStartMessage;
import org.robbins.flashcards.akka.message.Messages;
import org.robbins.flashcards.dto.AbstractAuditableDto;
import org.robbins.flashcards.dto.BatchLoadingReceiptDto;
import org.slf4j.Logger;
Expand Down Expand Up @@ -38,8 +35,8 @@ public class BatchSavingCoordinator extends AbstractActor
private final Map<Long, ActorRef> parents = new ConcurrentHashMap<>();
private final Map<Long, BatchLoadingReceiptDto> batchesInProgress = new ConcurrentHashMap<>();

private final List<WorkQueueItem> workQueue = new ArrayList<>();
private final Map<ActorRef, BatchSaveStartMessage> workInProgress = new HashMap<>();
private final List<Messages.WorkQueueItem> workQueue = new ArrayList<>();
private final Map<ActorRef, Messages.BatchSaveStartMessage> workInProgress = new HashMap<>();
private final List<ActorRef> idleWorkers = new ArrayList<>();

public BatchSavingCoordinator()
Expand Down Expand Up @@ -72,14 +69,14 @@ private void createBatchSavingActors()
public PartialFunction<Object, BoxedUnit> receive()
{
return ReceiveBuilder
.match(BatchSaveStartMessage.class, this::handleBatchSaveStart)
.match(SingleBatchSaveResultMessage.class, this::handleSingleBatchSaveResult)
.match(GiveMeWork.class, w -> scheduleWork(sender()))
.match(Messages.BatchSaveStartMessage.class, this::handleBatchSaveStart)
.match(Messages.SingleBatchSaveResultMessage.class, this::handleSingleBatchSaveResult)
.match(Messages.GiveMeWork.class, w -> scheduleWork(sender()))
.matchAny(o -> LOGGER.info("Received Unknown message"))
.build();
}

private void handleBatchSaveStart(final BatchSaveStartMessage startMessage)
private void handleBatchSaveStart(final Messages.BatchSaveStartMessage startMessage)
{
LOGGER.trace("Entering handleBatchSaveStart for message: {}", startMessage);
addMessageToWorkQueue(startMessage, sender());
Expand All @@ -89,7 +86,7 @@ private void handleBatchSaveStart(final BatchSaveStartMessage startMessage)
}
}

private void handleSingleBatchSaveResult(final SingleBatchSaveResultMessage resultMessage)
private void handleSingleBatchSaveResult(final Messages.SingleBatchSaveResultMessage resultMessage)
{
LOGGER.trace("Entering handleSingleBatchSaveResult for message: {}", resultMessage);

Expand All @@ -112,13 +109,13 @@ private void scheduleWork(final ActorRef worker)
else
{
// take the first item out of the Work queue and add it to the Work In Progress queue
final WorkQueueItem workQueueItem = workQueue.remove(0);
workInProgress.put(worker, workQueueItem.startMessage);
final Messages.WorkQueueItem workQueueItem = workQueue.remove(0);
workInProgress.put(worker, workQueueItem.startMessage());
LOGGER.trace("workInProgress size: {}", workInProgress.size());

LOGGER.debug("Sending SingleBatchSaveStartMessage message with batch id: '{}' to worker '{}'", workQueueItem.batchId, worker.toString());
worker.tell(new SingleBatchSaveStartMessage(workQueueItem.batchId, workQueueItem.batchPartition,
workQueueItem.startMessage.getFacade()), self());
LOGGER.debug("Sending SingleBatchSaveStartMessage message with batch id: '{}' to worker '{}'", workQueueItem.batchId(), worker.toString());
worker.tell(new Messages.SingleBatchSaveStartMessage(workQueueItem.batchId(), workQueueItem.batchPartition(),
workQueueItem.startMessage().facade()), self());
}

if (!workQueue.isEmpty() && !idleWorkers.isEmpty())
Expand All @@ -127,32 +124,32 @@ private void scheduleWork(final ActorRef worker)
}
}

private void addMessageToWorkQueue(final BatchSaveStartMessage startMessage, final ActorRef sender)
private void addMessageToWorkQueue(final Messages.BatchSaveStartMessage startMessage, final ActorRef sender)
{
LOGGER.debug("Received BatchSaveStartMessage message: {}", startMessage.toString());

final BatchLoadingReceiptDto receipt = startMessage.getReceipt();
final BatchLoadingReceiptDto receipt = startMessage.receipt();
parents.put(receipt.getId(), sender());

batchesInProgress.put(receipt.getId(), receipt);
LOGGER.debug("Batches in progress: {}", batchesInProgress.size());

final List<List<AbstractAuditableDto>> batches = Lists.partition(startMessage.getDtos(), receipt.getBatchSize());
LOGGER.debug("Splitting batch of {} into {} sub-batches", startMessage.getDtos().size(), batches.size());
final List<List<AbstractAuditableDto>> batches = Lists.partition(startMessage.dtos(), receipt.getBatchSize());
LOGGER.debug("Splitting batch of {} into {} sub-batches", startMessage.dtos().size(), batches.size());

batches.stream()
.forEach(batch -> workQueue.add(new WorkQueueItem(batch, startMessage, sender, receipt.getId())));
.forEach(batch -> workQueue.add(new Messages.WorkQueueItem(batch, startMessage, sender, receipt.getId())));
LOGGER.trace("workQueue size: {}", workQueue.size());
}

private void batchSaveFinish(final SingleBatchSaveResultMessage saveResult)
private void batchSaveFinish(final Messages.SingleBatchSaveResultMessage saveResult)
{
LOGGER.debug("Received SingleBatchSaveResultMessage message: {}", saveResult.toString());

final ActorRef parent = parents.get(saveResult.getBatchId());
final BatchLoadingReceiptDto batch = batchesInProgress.get(saveResult.getBatchId());
batch.setSuccessCount(batch.getSuccessCount() + saveResult.getSuccessCount());
batch.setFailureCount(batch.getFailureCount() + saveResult.getFailureCount());
final ActorRef parent = parents.get(saveResult.batchId());
final BatchLoadingReceiptDto batch = batchesInProgress.get(saveResult.batchId());
batch.setSuccessCount(batch.getSuccessCount() + saveResult.successCount());
batch.setFailureCount(batch.getFailureCount() + saveResult.failureCount());

if (isBatchComplete(batch))
{
Expand All @@ -169,72 +166,8 @@ private void completeBatchSave(final BatchLoadingReceiptDto batch, final ActorRe
{
batchesInProgress.remove(batch.getId());

final BatchSaveResultMessage batchSaveResultMessage = new BatchSaveResultMessage(batch);
final Messages.BatchSaveResultMessage batchSaveResultMessage = new Messages.BatchSaveResultMessage(batch);
LOGGER.debug("Sending BatchSaveResult: {}", batchSaveResultMessage);
parent.tell(batchSaveResultMessage, self());
}

public static class WorkQueueItem
{
final Long batchId;
final ActorRef sender;
final BatchSaveStartMessage startMessage;
final List<AbstractAuditableDto> batchPartition;

public WorkQueueItem(final List<AbstractAuditableDto> batchPartition, final BatchSaveStartMessage startMessage, final ActorRef sender, final Long batchId)
{
this.batchPartition = batchPartition;
this.startMessage = startMessage;
this.sender = sender;
this.batchId = batchId;
}

@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}

final WorkQueueItem that = (WorkQueueItem) o;

if (!batchPartition.equals(that.batchPartition))
{
return false;
}
if (!batchId.equals(that.batchId))
{
return false;
}
if (!sender.equals(that.sender))
{
return false;
}
if (!startMessage.equals(that.startMessage))
{
return false;
}

return true;
}

@Override
public int hashCode()
{
int result = batchId.hashCode();
result = 31 * result + sender.hashCode();
result = 31 * result + startMessage.hashCode();
return result;
}
}

public static class GiveMeWork
{

}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package org.robbins.flashcards.akka.actor;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import org.robbins.flashcards.akka.message.SingleSaveResultMessage;
import org.robbins.flashcards.akka.message.SingleSaveStartMessage;
import org.robbins.flashcards.SaveResultStatus;
import org.robbins.flashcards.akka.message.Messages;
import org.robbins.flashcards.conversion.DtoConverter;
import org.robbins.flashcards.dto.AbstractPersistableDto;
import org.robbins.flashcards.exceptions.FlashCardsException;
Expand All @@ -14,6 +10,11 @@
import org.robbins.flashcards.repository.FlashCardsAppRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;

Expand Down Expand Up @@ -41,22 +42,22 @@ public static Props props(final FlashCardsAppRepository repository,
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder
.match(SingleSaveStartMessage.class, startSave -> doStartSave(startSave, sender()))
.match(Messages.SingleSaveStartMessage.class, startSave -> doStartSave(startSave, sender()))
.matchAny(o -> LOGGER.error("Received Unknown message"))
.build();
}

private void doStartSave(final SingleSaveStartMessage startSaveMessage, final ActorRef sender) {
private void doStartSave(final Messages.SingleSaveStartMessage startSaveMessage, final ActorRef sender) {
LOGGER.trace("Received SingleSaveStart message: {}", startSaveMessage.toString());

SingleSaveResultMessage result = saveItem(startSaveMessage.getDto());
Messages.SingleSaveResultMessage result = saveItem(startSaveMessage.dto());

LOGGER.trace("Sending SingleSaveResultMessage message: {}", result.toString());
sender.tell(result, self());
}

private SingleSaveResultMessage saveItem(final AbstractPersistableDto dto) {
SingleSaveResultMessage.SaveResultStatus resultStatus = SingleSaveResultMessage.SaveResultStatus.SUCCESS;
private Messages.SingleSaveResultMessage saveItem(final AbstractPersistableDto dto) {
SaveResultStatus resultStatus = SaveResultStatus.SUCCESS;

try {
final AbstractAuditable entity = (AbstractAuditable) converter.getEntity(dto);
Expand All @@ -65,9 +66,9 @@ private SingleSaveResultMessage saveItem(final AbstractPersistableDto dto) {

} catch (FlashCardsException e) {
LOGGER.error("Unable to create Dto {}, error: {}", dto.toString(), e.getMessage());
resultStatus = SingleSaveResultMessage.SaveResultStatus.FAILURE;
resultStatus = SaveResultStatus.FAILURE;
}

return new SingleSaveResultMessage(resultStatus);
return new Messages.SingleSaveResultMessage(resultStatus);
}
}
Loading