Skip to content

Commit

Permalink
Runtime overhaul and modularization (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
notxcain committed Aug 7, 2017
1 parent 207353d commit 13f991a
Show file tree
Hide file tree
Showing 132 changed files with 4,254 additions and 2,276 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,7 +1,7 @@
language: scala
scala:
- 2.11.8
- 2.12.0
- 2.11.11-bin-typelevel-4
- 2.12.2-bin-typelevel-4

jdk:
- oraclejdk8
Expand Down
126 changes: 63 additions & 63 deletions README.md
Expand Up @@ -26,26 +26,9 @@ scalacOptions += "-Ypartial-unification"

### Defining and running behavior

Let's start with entity operations:
Let's start with defining domain events:

```scala
sealed trait SubscriptionOp[A] {
def subscriptionId: String
}
object SubscriptionOp {
case class CreateSubscription(subscriptionId: String, userId: String, productId: String, planId: String) extends SubscriptionOp[Unit]
case class PauseSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
case class ResumeSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
case class CancelSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
}
```

Entity events with persistent encoder and decoder:

```scala
import aecor.data.Folded.syntax._
import cats.syntax.option._

sealed trait SubscriptionEvent
object SubscriptionEvent {
case class SubscriptionCreated(subscriptionId: String, userId: String, productId: String, planId: String) extends SubscriptionEvent
Expand All @@ -58,10 +41,12 @@ object SubscriptionEvent {
}
```

Then we define how this events affect aggregate state:

`Folder[F, E, S]` instance represents the ability to fold `E`s into `S`, with effect `F` on each step.
Aecor runtime uses `Folded[A]` data type, with two possible states:
`Next(a)` - says that a should be used as a state for next folding step
`Impossible` - says that folding should be aborted (underlying runtime actor throws `IllegalStateException`)
`Next(a)` - says that `a` should be used as a state for next folding step
`Impossible` - says that folding should be aborted (e.g. underlying runtime actor throws `IllegalStateException`)

```scala
sealed trait SubscriptionStatus
Expand All @@ -71,89 +56,104 @@ object SubscriptionStatus {
case object Cancelled extends SubscriptionStatus
}

case class Subscription(status: SubscriptionStatus)
case class Subscription(status: SubscriptionStatus) {
def applyEvent(e: SubscriptionEvent): Folded[Subscription] = e match {
case e: SubscriptionCreated =>
impossible
case e: SubscriptionPaused =>
subscription.copy(status = Paused).next
case e: SubscriptionResumed =>
subscription.copy(status = Active).next
case e: SubscriptionCancelled =>
subscription.copy(status = Cancelled).next
}
}
object Subscription {
import SubscriptionStatus._

implicit def folder: Folder[Folded, SubscriptionEvent, Option[Subscription]] =
Folder.instance(Option.empty[Subscription]) {
case Some(subscription) => {
case e: SubscriptionCreated =>
impossible
case e: SubscriptionPaused =>
subscription.copy(status = Paused).some.next
case e: SubscriptionResumed =>
subscription.copy(status = Active).some.next
case e: SubscriptionCancelled =>
subscription.copy(status = Cancelled).some.next
}
case None => {
case SubscriptionCreated(subscriptionId, userId, productId, planId) =>
Subscription(Active).some.next
case _ =>
impossible
}
}
def init(e: SubscriptionEvent): Folded[Subscription] = e match {
case SubscriptionCreated(subscriptionId, userId, productId, planId) =>
Subscription(Active).next
case _ => impossible
}
def folder: Folder[Folded, SubscriptionEvent, Option[Subscription]] =
Folder.optionInstance(init)(_.applyEvent)
}

```


Now let's define a behavior that converts operation to its handler.
A `Handler[State, Event, Reply]` is just a wrapper around `State => (Seq[Event], Reply)`,
A `Handler[F, State, Event, Reply]` is just a wrapper around `State => (Seq[Event], Reply)`,
you can think of it as `Kleisli[(Seq[Event], ?), State, Reply]`
i.e. a side-effecting function with a side effect being a sequence of events representing state change caused by operation.

```scala
val behavior = Lambda[SubscriptionOp ~> Handler[Option[Subscription], SubscriptionEvent, ?]] {

sealed trait SubscriptionOp[A] {
def subscriptionId: String
}
object SubscriptionOp {
case class CreateSubscription(subscriptionId: String, userId: String, productId: String, planId: String) extends SubscriptionOp[Unit]
case class PauseSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
case class ResumeSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
case class CancelSubscription(subscriptionId: String) extends SubscriptionOp[Unit]
}

def handler[F[_]](implicit F: Applicative[F]) = Lambda[SubscriptionOp ~> Handler[F, Option[Subscription], SubscriptionEvent, ?]] {
case CreateSubscription(subscriptionId, userId, productId, planId) => {
case Some(subscription) =>
// Do nothing reply with ()
Seq.empty -> ()
F.pure(Seq.empty -> ())
case None =>
// Produce event and reply with ()
Seq(SubscriptionCreated(subscriptionId, userId, productId, planId)) -> ()
F.pure(Seq(SubscriptionCreated(subscriptionId, userId, productId, planId)) -> ())
}
case PauseSubscription(subscriptionId) => {
case Some(subscription) if subscription.status == Active =>
Seq(SubscriptionPaused(subscriptionId)) -> ()
F.pure(Seq(SubscriptionPaused(subscriptionId)) -> ())
case _ =>
Seq.empty -> ()
F.pure(Seq.empty -> ())
}
case ResumeSubscription(subscriptionId) => {
case Some(subscription) if subscription.status == Paused =>
Seq(SubscriptionResumed(subscriptionId)) -> ()
F.pure(Seq(SubscriptionResumed(subscriptionId)) -> ())
case _ =>
Seq.empty -> ()
F.pure(Seq.empty -> ())
}
case CancelSubscription(subscriptionId) => {
case Some(subscription) =>
Seq(SubscriptionCancelled(subscriptionId)) -> ()
F.pure(Seq(SubscriptionCancelled(subscriptionId)) -> ())
case _ =>
Seq.empty -> ()
F.pure(Seq.empty -> ())
}
}
```

Then you define a correlation function, entity name and a value provided by correlation function form unique primary key for aggregate.
It should not be changed in the future, at least without prior event migration.
It should not be changed between releases, at least without prior event migration.

```scala
def correlation: Correlation[SubscriptionOp] = {
def mk[A](op: SubscriptionOp[A]): CorrelationF[A] = op.subscriptionId
FunctionK.lift(mk _)
}
def correlation: Correlation[SubscriptionOp] = _.subscriptionId
```

After that we are ready to launch.

```scala
implicit val system = ActorSystem("foo")
import monix.cats._
import monix.eval.Task
import aecor.effect.monix._
import aecor.runtime.akkapersistence.AkkaPersistenceRuntime

val system = ActorSystem("foo")

val subscriptions: SubscriptionOp ~> Future =
AkkaRuntime(system).start(
entityName = "Subscription",
behavior,
val subscriptions: SubscriptionOp ~> Task =
AkkaPersistenceRuntime(
system,
"Subscription",
correlation,
Tagging(EventTag("Payment")
EventsourcedBehavior(
handler,
Subscription.folder
),
Tagging.const[SubscriptionEvent](EventTag("Payment"))
)
```
@@ -0,0 +1,58 @@
package aecor.runtime.akkageneric

import aecor.data.{ Behavior, Correlation }
import aecor.effect.{ Async, Capture, CaptureFuture }
import aecor.runtime.akkageneric.GenericAkkaRuntime.CorrelatedCommand
import akka.actor.ActorSystem
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.pattern._
import akka.util.Timeout
import cats.~>

import scala.concurrent.Future

object GenericAkkaRuntime {
def apply[F[_]: Async: CaptureFuture: Capture](system: ActorSystem): GenericAkkaRuntime[F] =
new GenericAkkaRuntime(system)
private final case class CorrelatedCommand[A](entityId: String, command: A)
}

class GenericAkkaRuntime[F[_]: Async: CaptureFuture: Capture](system: ActorSystem) {
def start[Op[_]](typeName: String,
correlation: Correlation[Op],
behavior: Behavior[F, Op],
settings: GenericAkkaRuntimeSettings =
GenericAkkaRuntimeSettings.default(system)): F[Op ~> F] =
Capture[F]
.capture {
val numberOfShards = settings.numberOfShards

val extractEntityId: ShardRegion.ExtractEntityId = {
case CorrelatedCommand(entityId, c) =>
(entityId, GenericAkkaRuntimeActor.PerformOp(c.asInstanceOf[Op[_]]))
}

val extractShardId: ShardRegion.ExtractShardId = {
case CorrelatedCommand(entityId, _) =>
(scala.math.abs(entityId.hashCode) % numberOfShards).toString
case other => throw new IllegalArgumentException(s"Unexpected message [$other]")
}

val props = GenericAkkaRuntimeActor.props(behavior, settings.idleTimeout)

val shardRegionRef = ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = settings.clusterShardingSettings,
extractEntityId = extractEntityId,
extractShardId = extractShardId
)

implicit val timeout = Timeout(settings.askTimeout)
new (Op ~> F) {
override def apply[A](fa: Op[A]): F[A] = CaptureFuture[F].captureFuture {
(shardRegionRef ? CorrelatedCommand(correlation(fa), fa)).asInstanceOf[Future[A]]
}
}
}
}
@@ -0,0 +1,84 @@
package aecor.runtime.akkageneric

import java.util.UUID

import aecor.data.Behavior
import aecor.effect.Async
import aecor.effect.Async.ops._
import aecor.runtime.akkageneric.GenericAkkaRuntimeActor.PerformOp
import akka.actor.{ Actor, ActorLogging, Props, ReceiveTimeout, Stash, Status }
import akka.cluster.sharding.ShardRegion
import akka.pattern.pipe

import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

object GenericAkkaRuntimeActor {
def props[F[_]: Async, Op[_]](behavior: Behavior[F, Op], idleTimeout: FiniteDuration): Props =
Props(new GenericAkkaRuntimeActor(behavior, idleTimeout))

final case class PerformOp[Op[_], A](op: Op[A])
case object Stop
}

private[aecor] final class GenericAkkaRuntimeActor[F[_]: Async, Op[_]](behavior: Behavior[F, Op],
idleTimeout: FiniteDuration)
extends Actor
with Stash
with ActorLogging {

import context._

private case class Result(id: UUID, value: Try[(Behavior[F, Op], Any)])

setIdleTimeout()

override def receive: Receive = withBehavior(behavior)

private def withBehavior(behavior: Behavior[F, Op]): Receive = {
case PerformOp(op) =>
val opId = UUID.randomUUID()
behavior
.run(op.asInstanceOf[Op[Any]])
.unsafeRun
.map(x => Result(opId, Success(x)))
.recover {
case NonFatal(e) => Result(opId, Failure(e))
}
.pipeTo(self)(sender)

become {
case Result(`opId`, value) =>
value match {
case Success((newBehavior, reply)) =>
sender() ! reply
become(withBehavior(newBehavior))
case Failure(cause) =>
sender() ! Status.Failure(cause)
throw cause
}
unstashAll()
case _ =>
stash()
}
case ReceiveTimeout =>
passivate()
case GenericAkkaRuntimeActor.Stop =>
context.stop(self)
case Result(_, _) =>
log.debug(
"Ignoring result of another operation. Probably targeted previous instance of actor."
)
}

private def passivate(): Unit = {
log.debug("Passivating...")
context.parent ! ShardRegion.Passivate(GenericAkkaRuntimeActor.Stop)
}

private def setIdleTimeout(): Unit = {
log.debug("Setting idle timeout to [{}]", idleTimeout)
context.setReceiveTimeout(idleTimeout)
}
}
@@ -1,4 +1,4 @@
package aecor.aggregate
package aecor.runtime.akkageneric

import java.util.concurrent.TimeUnit

Expand All @@ -7,24 +7,24 @@ import akka.cluster.sharding.ClusterShardingSettings

import scala.concurrent.duration._

final case class AkkaRuntimeSettings(numberOfShards: Int,
idleTimeout: FiniteDuration,
askTimeout: FiniteDuration,
clusterShardingSettings: ClusterShardingSettings)
final case class GenericAkkaRuntimeSettings(numberOfShards: Int,
idleTimeout: FiniteDuration,
askTimeout: FiniteDuration,
clusterShardingSettings: ClusterShardingSettings)

object AkkaRuntimeSettings {
object GenericAkkaRuntimeSettings {

/**
* Reads config from `aecor.akka-runtime`, see reference.conf for details
* @param system Actor system to get config from
* @return default settings
*/
def default(system: ActorSystem): AkkaRuntimeSettings = {
def default(system: ActorSystem): GenericAkkaRuntimeSettings = {
val config = system.settings.config.getConfig("aecor.akka-runtime")
def getMillisDuration(path: String): FiniteDuration =
Duration(config.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)

AkkaRuntimeSettings(
GenericAkkaRuntimeSettings(
config.getInt("number-of-shards"),
getMillisDuration("idle-timeout"),
getMillisDuration("ask-timeout"),
Expand Down

0 comments on commit 13f991a

Please sign in to comment.