Skip to content

Commit

Permalink
Merge branch 'feature/issue_60' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Renato Cavalcanti committed Sep 15, 2016
2 parents 3054f40 + b1ceebd commit bb359dd
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 84 deletions.
51 changes: 36 additions & 15 deletions modules/akka/src/main/scala/io/funcqrs/akka/AggregateActor.scala
Expand Up @@ -82,11 +82,11 @@ class AggregateActor[A <: AggregateLike](

case cmd: Command =>
log.debug("Received cmd: {}", cmd)
val eventualEvents = interpreter.onCommand(aggregateState, cmd)
val eventualEvents = interpreter.applyCommand(aggregateState, cmd)
val origSender = sender()

eventualEvents map {
events => Successful(events, origSender)
case (events, nextState) => Successful(events, nextState, origSender)
} recover {
case NonFatal(cause) => FailedCommand(cause, origSender)
} pipeTo self
Expand All @@ -105,7 +105,7 @@ class AggregateActor[A <: AggregateLike](
val busyReceive: Receive = {

case AggregateActor.StateRequest(requester) => sendState(requester)
case Successful(events, origSender) => onSuccess(events, origSender)
case Successful(events, nextState, origSender) => onSuccess(events, nextState, origSender)
case failedCmd: FailedCommand => onFailure(failedCmd)

case cmd: Command =>
Expand Down Expand Up @@ -149,16 +149,10 @@ class AggregateActor[A <: AggregateLike](
}
}

/**
* Apply event on the AggregateRoot.
*/
def applyEvent(event: Event): State[Aggregate] =
interpreter.onEvent(aggregateState, event)

protected def onEvent(evt: Event): Unit = {
log.debug("Reapplying event {}", evt)
eventsSinceLastSnapshot += 1
aggregateState = applyEvent(evt)
aggregateState = interpreter.onEvent(aggregateState, evt)
log.debug("State after event {}", aggregateState)
changeState(Available)
}
Expand Down Expand Up @@ -191,13 +185,41 @@ class AggregateActor[A <: AggregateLike](
}
}

private def onSuccess(events: Events, origSender: ActorRef): Unit = {
private def onSuccess(events: Events, updatedState: State[Aggregate], origSender: ActorRef): Unit = {

if (events.nonEmpty) {
persistAll(events) { evt => afterEventPersisted(evt) }

var eventsCount = events.size

persistAll(events) { _ =>

eventsCount -= 1
eventsSinceLastSnapshot += 1

// are we on last event?
if (eventsCount == 0) {

// we only update the internal aggregate state once all events are persisted
aggregateState = updatedState

// have we crossed the snapshot threshold?
if (eventsSinceLastSnapshot >= eventsPerSnapshot) {
aggregateState match {
case Initialized(aggregate) =>
log.debug("{} events reached, saving snapshot", eventsPerSnapshot)
saveSnapshot(aggregate)
case _ =>
}
eventsSinceLastSnapshot = 0
}
// send feedback to command sender
origSender ! events
}
}
}
origSender ! events

changeState(Available)

}

/**
Expand All @@ -210,7 +232,6 @@ class AggregateActor[A <: AggregateLike](
*/
protected def afterEventPersisted(evt: Event): Unit = {

aggregateState = applyEvent(evt)
eventsSinceLastSnapshot += 1

if (eventsSinceLastSnapshot >= eventsPerSnapshot) {
Expand All @@ -227,7 +248,7 @@ class AggregateActor[A <: AggregateLike](
/**
* Internal representation of a completed update command.
*/
private case class Successful(events: Events, origSender: ActorRef)
private case class Successful(events: Events, nextState: State[Aggregate], origSender: ActorRef)

private case class FailedCommand(cause: Throwable, origSender: ActorRef)

Expand Down
Expand Up @@ -23,7 +23,7 @@ class AsyncInterpreter[A <: AggregateLike](val behavior: Behavior[A]) extends In
case (cmd, FutureCommandHandlerInvoker(handler)) => handler(cmd)
}

protected def fromTry(events: Try[Events]): Future[Events] = Future.fromTry(events)
protected def fromTry[B](any: Try[B]): Future[B] = Future.fromTry(any)
}

object AsyncInterpreter {
Expand Down
Expand Up @@ -28,8 +28,8 @@ class IdentityInterpreter[A <: AggregateLike](val behavior: Behavior[A], atMost:
case (cmd, FutureCommandHandlerInvoker(handler)) => Await.result(handler(cmd), atMost)
}

protected def fromTry(events: Try[Events]): Identity[Events] =
events.get // yes, we force a 'get'. Nothing can be done if we can't handle an event
protected def fromTry[B](any: Try[B]): Identity[B] =
any.get // yes, we force a 'get'. Nothing can be done if we can't handle an event

}

Expand Down
Expand Up @@ -38,53 +38,37 @@ abstract class Interpreter[A <: AggregateLike, F[_]: MonadOps] extends Aggregate
protected def interpret: InterpreterFunction

/**
* Lift a [[Try]] of [[Events]] to a [[F]] of [[Events]]
* Convert a [[Try]] to a [[F]]
*
* Internally, interpreters will check if all emitted events can be applied. In other words, it checks
* if event handlers were defined for each emitted event for each state transition.
* In the occurrence of missing behavior, we have no other choice than emitting a [[MissingBehaviorException]].
* This is wrapped in a [[Try]] that must be converted to the correct error type for F[_]
*
* In the occurrence of missing event handlers, we have no other choice than emitting a [[MissingEventHandlerException]].
* This is wrapped in a [[Try]] that must be lift to F[_]
*
* @param events - the produced events
* @param any - the produced events
* @return
*/
protected def fromTry(events: Try[Events]): F[Events]
protected def fromTry[B](any: Try[B]): F[B]

final def onCommand(state: State[Aggregate], cmd: Command): F[Events] = {
private final def onCommand(state: State[Aggregate], cmd: Command): F[Events] = {

val actionsTry =
Try(behavior(state)).recoverWith {
case _: MatchError =>
Failure(new MissingBehaviorException(s"No behavior defined for current aggregate state"))
val tryActions =
if (behavior.isDefinedAt(state)) {
Success(behavior(state))
} else {
Failure(new MissingBehaviorException(s"No behavior defined for current aggregate state"))
}

val result = actionsTry.map { actions =>
// produce all events by apply Command
val events = interpret(cmd, actions.onCommand(cmd))

events.flatMap { evts =>
fromTry(tryHandleAllEvents(state, evts))
}
// produce all events by applying Command
val tryEvents = tryActions.map { actions =>
interpret(cmd, actions.onCommand(cmd))
}

// Try[F[Events]]
result match {
// Try[F[Events]] -> F[Events]
tryEvents match {
case Success(eventsInF) => eventsInF
case Failure(ex) => fromTry(Failure(ex))
}
}

private final def tryHandleAllEvents(state: State[Aggregate], events: Events): Try[Events] = {
// apply all emitted events to aggregate state to verify that
// we have event handlers for them all
Try(onEvents(state, events))
.map(_ => events) // if successful, return a Success(events)
.recoverWith {
case _: MatchError => Failure(new MissingBehaviorException(s"No behavior defined for current aggregate state"))
}
}

/**
* Apply all 'evt' on passed 'state'.
*
Expand All @@ -93,8 +77,13 @@ abstract class Interpreter[A <: AggregateLike, F[_]: MonadOps] extends Aggregate
* @throws MissingEventHandlerException if no Event handler is defined for the passed event.
* @return new aggregate state after applying event
*/
final def onEvent(state: State[Aggregate], evt: Event): State[Aggregate] =
Initialized(behavior(state).onEvent(evt))
final def onEvent(state: State[Aggregate], evt: Event): State[Aggregate] = {
if (behavior.isDefinedAt(state)) {
Initialized(behavior(state).onEvent(evt))
} else {
throw new MissingBehaviorException(s"No behavior defined for current aggregate state")
}
}

/**
* Apply all 'evts' on passed 'state'.
Expand All @@ -104,16 +93,22 @@ abstract class Interpreter[A <: AggregateLike, F[_]: MonadOps] extends Aggregate
* @throws MissingEventHandlerException if no Event handler is defined for one of the passed events.
* @return new aggregate state after applying all events
*/
final def onEvents(state: State[Aggregate], evts: Events): State[Aggregate] = {
evts.foldLeft(state) {
case (aggState, evt) => onEvent(aggState, evt)
}
}
private final def onEvents(state: State[Aggregate], evts: Events): F[State[Aggregate]] = {

final def applyCommand(cmd: Command, state: State[Aggregate]): F[(Events, State[A])] = {
onCommand(state, cmd).map { evts: Events =>
(evts, onEvents(state, evts))
}
val tried =
Try { // don't let exceptions leak
evts.foldLeft(state) {
case (aggState, evt) => onEvent(aggState, evt)
}
}

fromTry(tried)
}

final def applyCommand(state: State[Aggregate], cmd: Command): F[(Events, State[A])] =
for {
evts <- onCommand(state, cmd)
updatedAgg <- onEvents(state, evts)
} yield (evts, updatedAgg)

}
Expand Up @@ -28,7 +28,7 @@ class TryInterpreter[A <: AggregateLike](val behavior: Behavior[A], atMost: Dura
case (cmd, FutureCommandHandlerInvoker(handler)) => Try(Await.result(handler(cmd), atMost))
}

protected def fromTry(events: Try[Events]): Try[Events] = events
protected def fromTry[B](any: Try[B]): Try[B] = any

}

Expand Down
Expand Up @@ -101,7 +101,7 @@ class InMemoryBackend extends Backend[Identity] {
}

private def handle(state: State[Aggregate], cmd: Command): interpreter.Events = {
val (events, updatedAgg) = interpreter.applyCommand(cmd, state)
val (events, updatedAgg) = interpreter.applyCommand(state, cmd)
aggregateState = updatedAgg
publishEvents(events)
events
Expand Down
26 changes: 5 additions & 21 deletions modules/tests/src/test/scala/io/funcqrs/InterpreterTest.scala
Expand Up @@ -4,7 +4,7 @@ import java.time.OffsetDateTime

import io.funcqrs.behavior._
import io.funcqrs.interpreters.IdentityInterpreter
import io.funcqrs.model.TimerTrackerProtocol.{ TimerCreated, TimerStarted, _ }
import io.funcqrs.model.TimerTrackerProtocol._
import io.funcqrs.model.{ BusyTracker, IdleTracker, TimeTracker, TrackerId }
import org.scalatest.{ FunSuite, Matchers }
/**
Expand All @@ -15,25 +15,6 @@ class InterpreterTest extends FunSuite with Matchers {

val initialState = Uninitialized[TimeTracker](TrackerId.generate)

test("A interpreter will fail a Command if missing event handlers for a given event") {

// a bogus TimeTracker behavior
// can't start timer due to empty actions for Idle state
def behavior: Behavior[TimeTracker] =
Behavior {
factoryActions(TrackerId.generate)
} {
case _ => Actions.empty
}

val interpreter = IdentityInterpreter(behavior)

// missing handler for TimerStarted event
intercept[MissingEventHandlerException] {
interpreter.onCommand(initialState, CreateAndStartTracking("test"))
}
}

test("A interpreter will fail a Command if missing behavior for a given state") {
// a bogus TimeTracker behavior
// can't start timer due to missing case for Idle state
Expand All @@ -49,12 +30,15 @@ class InterpreterTest extends FunSuite with Matchers {

// missing behavior for Idle state
intercept[MissingBehaviorException] {
interpreter.onCommand(initialState, CreateAndStartTracking("test"))
interpreter.applyCommand(initialState, CreateAndStartTracking("test"))
}
}

def factoryActions(trackerId: TrackerId) =
actions[TimeTracker]
.handleCommand {
cmd: CreateTracker.type => TimerCreated(EventId(), cmd.id)
}
.handleCommand.manyEvents {
cmd: CreateAndStartTracking =>
List(
Expand Down

0 comments on commit bb359dd

Please sign in to comment.