Skip to content

Commit

Permalink
#65 support testing of Process Managers [wip]
Browse files Browse the repository at this point in the history
  • Loading branch information
pawelkaczor committed Dec 20, 2017
1 parent f423af5 commit ea03e18
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 232 deletions.
@@ -0,0 +1,181 @@
package pl.newicom.dddd.test.pm

import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import akka.util.Timeout
import pl.newicom.dddd.aggregate.{Command, DomainEvent}
import pl.newicom.dddd.delivery.protocol.alod.Processed
import pl.newicom.dddd.messaging.MetaData
import pl.newicom.dddd.messaging.event.EventMessage
import pl.newicom.dddd.office.OfficeRef
import pl.newicom.dddd.office.SimpleOffice.Batch
import pl.newicom.dddd.test.pm.GivenWhenThenPMTestFixture.{EventsHandler, ExpectedEvents, PastEvents, WhenContext, testProbe}

import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.Success

/**
* Given
*/
case class Given(es: Seq[DomainEvent] = Seq.empty)(implicit s: ActorSystem, eh: EventsHandler, timeout: FiniteDuration) {
val pastEvents: PastEvents = PastEvents(eh(es).toList)

def when[E <: DomainEvent](f: (WhenContext[_]) => WhenContext[E]): When[E] =
when(f(fakeWhenContext(pastEvents)))

def when[E <: DomainEvent](wc: WhenContext[E]): When[E] = when(wc, () => {
eh(Seq(wc.event)).map(_.payload).map(Success(_)).foreach(s.eventStream.publish)
})

private def when[E <: DomainEvent](wc: WhenContext[E], whenFun: () => Unit): When[E] = {
When(wc.copy(pastEvents = pastEvents), whenFun)
}

private def fakeWhenContext(pastEvents: PastEvents = PastEvents()) = WhenContext(Seq(new DomainEvent), pastEvents)

}

/**
* When
*/
case class When[E <: DomainEvent](wc: WhenContext[E], whenFun: () => Unit)(implicit s: ActorSystem, timeout: FiniteDuration) {

def expectEvents(events: DomainEvent*): Unit = {
val probe = testProbe(whenFun)
events.foreach { _ =>
probe.expectMsgAnyOf[DomainEvent](timeout, events.map(Success(_)): _*)
}
}

def expectEvent(e: DomainEvent): Unit = {
expectEventMatching(
matcher = {
case actual if actual == e => e
},
s"Success($e)"
)
}

def expect(f: (WhenContext[E]) => ExpectedEvents): Unit =
f(wc) match {
case ExpectedEvents(Seq(e)) =>
expectEvent(e)
case ExpectedEvents(events) =>
expectEvents(events :_*)
}

def expectEventMatching(matcher: PartialFunction[Any, Any], hint: String = ""): Any = {
testProbe(whenFun).expectMsgPF[Any](timeout, hint) {
case Success(result) if matcher.isDefinedAt(result) => result
}
}
}

/**
* Fixture
*/
object GivenWhenThenPMTestFixture {

type EventsHandler = Seq[DomainEvent] => Seq[EventMessage]

case class Commands[C <: Command](commands: Seq[C]) {
def &(c: C): Commands[C] = Commands[C](commands :+ c)
}

case class WhenContext[E <: DomainEvent](
event: E,
pastEvents: PastEvents = PastEvents())

case class PastEvents(list: List[EventMessage] = List.empty) {
private val map: Map[Class[_], List[DomainEvent]] =
list.map(_.payload).groupBy(_.getClass)

private def event[E](selection: List[E] => E)(implicit ct: ClassTag[E]) =
map.get(ct.runtimeClass)
.map(es => es.asInstanceOf[List[E]]).map(selection)
.getOrElse(null.asInstanceOf[E])

def first[E](implicit ct: ClassTag[E]): E = event[E](_.head)
def last[E](implicit ct: ClassTag[E]): E = event[E](_.last)
}

case class ExpectedEvents(events: Seq[DomainEvent]) {
def &(e: DomainEvent): ExpectedEvents = ExpectedEvents(events :+ e)
}

def testProbe(f: () => Unit)(implicit system: ActorSystem): TestProbe = {
new TestProbe(system) {
var initialized = false

def initialize(): Unit = {
system.eventStream.subscribe(this.ref, classOf[Success[_]])
f()
}

override def receiveOne(max: Duration): AnyRef = {
if (!initialized) {
initialize(); initialized = true
}
super.receiveOne(max)
}
}
}

implicit def whenContextToEvent[E <: DomainEvent](wc: WhenContext[E]): E = wc.event

implicit def whenContextToPastEvents[E <: DomainEvent](wc: WhenContext[E]): PastEvents = wc.pastEvents

implicit def eventToWhenContext[E <: DomainEvent](e: E): WhenContext[E] = WhenContext(e)

}

abstract class GivenWhenThenPMTestFixture(_system: ActorSystem) extends TestKit(_system) with ImplicitSender {

implicit val timeout: FiniteDuration = Timeout(5.seconds).duration

def officeUnderTest: OfficeRef

def ensureOfficeTerminated(): Unit

// DSL

def given(es: List[DomainEvent]): Given = given(es :_*)

def given(es: DomainEvent*): Given = Given(es)

def when[E <: DomainEvent](wc: WhenContext[E]): When[E] = Given().when(wc)

def last[E](implicit wc: WhenContext[_], ct: ClassTag[E]): E = past

def past[E](implicit wc: WhenContext[_], ct: ClassTag[E]): E =
wc.pastEvents.last[E]

def first[E](implicit wc: WhenContext[_], ct: ClassTag[E]): E =
wc.pastEvents.first[E]

protected def eventMetaDataProvider(e: DomainEvent): MetaData

implicit def toExpectedEvents(e: DomainEvent): ExpectedEvents =
ExpectedEvents(Seq(e))


// Private methods

private implicit def eventsHandler: EventsHandler = {
def em(e: DomainEvent, deliveryId: Int) =
EventMessage(e)
.withMetaData(eventMetaDataProvider(e))
.withDeliveryId(deliveryId)

(es: Seq[DomainEvent]) =>
if (es.isEmpty) {
Seq.empty
} else {
val batch = Batch(es.map(e => em(e, es.indexOf(e) + 1)))
officeUnderTest.actor ! batch
expectMsgAllClassOf(timeout, es.map(_ => classOf[Processed]): _*).flatMap(_ => batch.msgs)
}
}.andThen(r => { if (r.nonEmpty) { ensureOfficeTerminated() }; r})

}
87 changes: 87 additions & 0 deletions akka-ddd-test/src/main/scala/pl/newicom/dddd/test/pm/PMSpec.scala
@@ -0,0 +1,87 @@
package pl.newicom.dddd.test.pm

import akka.actor._
import akka.testkit.TestKit
import org.scalacheck.Gen
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, WordSpecLike}
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import pl.newicom.dddd.actor.ActorFactory
import pl.newicom.dddd.aggregate._
import pl.newicom.dddd.messaging.MetaAttribute.Correlation_Id
import pl.newicom.dddd.messaging.MetaData
import pl.newicom.dddd.office.OfficeFactory._
import pl.newicom.dddd.office.SimpleOffice._
import pl.newicom.dddd.office.{LocalOfficeId, OfficeListener, OfficeRef}
import pl.newicom.dddd.process.{Saga, SagaActorFactory}
import pl.newicom.dddd.test.ar.ARSpec.sys
import pl.newicom.dddd.utils.UUIDSupport._

import scala.concurrent.duration._
import scala.reflect.ClassTag


/**
* @param sharePM if set to true, the same PM instance will be used in all tests, default is false
*/
abstract class PMSpec[PM <: Saga : SagaActorFactory : LocalOfficeId](_system: Option[ActorSystem] = None, val sharePM: Boolean = false)(implicit pmClassTag: ClassTag[PM])
extends GivenWhenThenPMTestFixture(_system.getOrElse(sys(pmClassTag.runtimeClass))) with WordSpecLike with BeforeAndAfterAll with BeforeAndAfter {

val logger: Logger = getLogger(getClass)

override def officeUnderTest: OfficeRef = {
implicit val _ = new OfficeListener[PM]
if (_officeUnderTest == null) _officeUnderTest = office[PM]
_officeUnderTest
}

private var _officeUnderTest: OfficeRef = _

implicit var _pmIdGen: Gen[EntityId] = _

val testSuiteId: String = uuid10

before {
_pmIdGen = Gen.const[String](if (sharePM) testSuiteId else uuid10)
}

after {
ensureOfficeTerminated() //will nullify _officeUnderTest
}

override def afterAll() {
TestKit.shutdownActorSystem(system)
}

def pmId(implicit pmIdGen: Gen[EntityId]): EntityId = pmIdGen.sample.get

override def eventMetaDataProvider(e: DomainEvent): MetaData =
MetaData(Correlation_Id -> pmId)

implicit def topLevelParent[T : LocalOfficeId](implicit system: ActorSystem): ActorFactory[T] = {
new ActorFactory[T] {
override def getChild(name: String): Option[ActorRef] = None
override def createChild(props: Props, name: String): ActorRef = {
system.actorOf(props, name)
}
}
}

def ensureTerminated(actor: ActorRef): Any = {
watch(actor) ! PoisonPill
fishForMessage(1.seconds) {
case Terminated(_) =>
unwatch(actor)
true
case _ => false
}
}

override def ensureOfficeTerminated(): Unit = {
if (_officeUnderTest != null) {
ensureTerminated(_officeUnderTest.actor)
}
_officeUnderTest = null
}

}

0 comments on commit ea03e18

Please sign in to comment.