Skip to content

Commit

Permalink
Prepare Eventsourced and Generic Akka Runtime for production use (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
notxcain committed May 7, 2018
1 parent d64c055 commit eefb1b0
Show file tree
Hide file tree
Showing 42 changed files with 485 additions and 224 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ lazy val akkaPersistenceSettings = commonProtobufSettings ++ Seq(
) )
) )


lazy val akkaGenericSettings = Seq( lazy val akkaGenericSettings = commonProtobufSettings ++ Seq(
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
) )
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto2";

package aecor.runtime.akkageneric.serialization;
option java_package = "aecor.runtime.akkageneric.serialization.msg";
option optimize_for = SPEED;

message KeyedCommand {
required string key = 1;
required bytes bytes = 2;
}


14 changes: 13 additions & 1 deletion modules/akka-cluster-runtime/src/main/resources/reference.conf
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,4 +5,16 @@ aecor {
idle-timeout = 60s idle-timeout = 60s
} }
} }

akka {
actor {
serialization-identifiers {
"aecor.runtime.akkageneric.serialization.MessageSerializer" = 140
}
serializers {
aecor-akka-generic-runtime-message-serializer = "aecor.runtime.akkageneric.serialization.MessageSerializer"
}
serialization-bindings {
"aecor.runtime.akkageneric.serialization.Message" = aecor-akka-generic-runtime-message-serializer
}
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import io.aecor.liberator.Invocation
import aecor.data.Behavior import aecor.data.Behavior
import aecor.encoding.{ KeyDecoder, KeyEncoder } import aecor.encoding.{ KeyDecoder, KeyEncoder }
import aecor.encoding.WireProtocol import aecor.encoding.WireProtocol
import aecor.runtime.akkageneric.GenericAkkaRuntime.Command import aecor.runtime.akkageneric.GenericAkkaRuntime.KeyedCommand
import aecor.runtime.akkageneric.GenericAkkaRuntimeActor.CommandResult
import aecor.runtime.akkageneric.serialization.Message
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.cluster.sharding.{ ClusterSharding, ShardRegion } import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.pattern._ import akka.pattern._
Expand All @@ -19,28 +21,28 @@ import cats.implicits._
import scala.concurrent.Future import scala.concurrent.Future


object GenericAkkaRuntime { object GenericAkkaRuntime {
def apply[F[_]: Effect](system: ActorSystem): GenericAkkaRuntime[F] = def apply(system: ActorSystem): GenericAkkaRuntime =
new GenericAkkaRuntime(system) new GenericAkkaRuntime(system)
private final case class Command(entityId: String, bytes: ByteBuffer) private[akkageneric] final case class KeyedCommand(key: String, bytes: ByteBuffer) extends Message
} }


final class GenericAkkaRuntime[F[_]] private (system: ActorSystem)(implicit F: Effect[F]) { final class GenericAkkaRuntime private (system: ActorSystem) {
def deploy[K: KeyEncoder: KeyDecoder, M[_[_]]]( def deploy[K: KeyEncoder: KeyDecoder, M[_[_]], F[_]](
typeName: String, typeName: String,
createBehavior: K => Behavior[M, F], createBehavior: K => Behavior[M, F],
settings: GenericAkkaRuntimeSettings = GenericAkkaRuntimeSettings.default(system) settings: GenericAkkaRuntimeSettings = GenericAkkaRuntimeSettings.default(system)
)(implicit M: WireProtocol[M]): F[K => M[F]] = )(implicit M: WireProtocol[M], F: Effect[F]): F[K => M[F]] =
F.delay { F.delay {
val numberOfShards = settings.numberOfShards val numberOfShards = settings.numberOfShards


val extractEntityId: ShardRegion.ExtractEntityId = { val extractEntityId: ShardRegion.ExtractEntityId = {
case Command(entityId, c) => case KeyedCommand(entityId, c) =>
(entityId, GenericAkkaRuntimeActor.PerformOp(c)) (entityId, GenericAkkaRuntimeActor.Command(c))
} }


val extractShardId: ShardRegion.ExtractShardId = { val extractShardId: ShardRegion.ExtractShardId = {
case Command(correlationId, _) => case KeyedCommand(key, _) =>
String.valueOf(scala.math.abs(correlationId.hashCode) % numberOfShards) String.valueOf(scala.math.abs(key.hashCode) % numberOfShards)
case other => throw new IllegalArgumentException(s"Unexpected message [$other]") case other => throw new IllegalArgumentException(s"Unexpected message [$other]")
} }


Expand All @@ -63,13 +65,13 @@ final class GenericAkkaRuntime[F[_]] private (system: ActorSystem)(implicit F: E
new (Invocation[M, ?] ~> F) { new (Invocation[M, ?] ~> F) {
override def apply[A](fa: Invocation[M, A]): F[A] = F.suspend { override def apply[A](fa: Invocation[M, A]): F[A] = F.suspend {
val (bytes, decoder) = fa.invoke(M.encoder) val (bytes, decoder) = fa.invoke(M.encoder)
Effect[F] F.fromFuture {
.fromFuture { (shardRegionRef ? KeyedCommand(keyEncoder(key), bytes.asReadOnlyBuffer()))
(shardRegionRef ? Command(keyEncoder(key), bytes.asReadOnlyBuffer())) .asInstanceOf[Future[CommandResult]]
.asInstanceOf[Future[ByteBuffer]] }
.flatMap { result =>
F.fromEither(decoder.decode(result.bytes))
} }
.map(decoder.decode)
.flatMap(F.fromEither)
} }
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import io.aecor.liberator.Invocation
import aecor.data.{ Behavior, PairT } import aecor.data.{ Behavior, PairT }
import aecor.encoding.KeyDecoder import aecor.encoding.KeyDecoder
import aecor.encoding.WireProtocol import aecor.encoding.WireProtocol
import aecor.runtime.akkageneric.GenericAkkaRuntimeActor.PerformOp import aecor.runtime.akkageneric.GenericAkkaRuntimeActor.{ Command, CommandResult }
import aecor.runtime.akkageneric.serialization.Message
import aecor.util.effect._ import aecor.util.effect._
import akka.actor.{ Actor, ActorLogging, Props, ReceiveTimeout, Stash, Status } import akka.actor.{ Actor, ActorLogging, Props, ReceiveTimeout, Stash, Status }
import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion
Expand All @@ -27,7 +28,8 @@ private[aecor] object GenericAkkaRuntimeActor {
): Props = ): Props =
Props(new GenericAkkaRuntimeActor(createBehavior, idleTimeout)) Props(new GenericAkkaRuntimeActor(createBehavior, idleTimeout))


private[akkageneric] final case class PerformOp(op: ByteBuffer) private[akkageneric] final case class Command(bytes: ByteBuffer) extends Message
private[akkageneric] final case class CommandResult(bytes: ByteBuffer) extends Message
private[akkageneric] case object Stop private[akkageneric] case object Stop
} }


Expand Down Expand Up @@ -59,14 +61,13 @@ private[aecor] final class GenericAkkaRuntimeActor[I: KeyDecoder, M[_[_]], F[_]:
override def receive: Receive = withBehavior(createBehavior(id)) override def receive: Receive = withBehavior(createBehavior(id))


private def withBehavior(behavior: Behavior[M, F]): Receive = { private def withBehavior(behavior: Behavior[M, F]): Receive = {
case PerformOp(opBytes) => case Command(opBytes) =>
M.decoder M.decoder
.decode(opBytes) match { .decode(opBytes) match {
case Right(pair) => case Right(pair) =>
performInvocation(behavior.actions, pair.left, pair.right) performInvocation(behavior.actions, pair.left, pair.right)
case Left(decodingError) => case Left(decodingError) =>
val error = s"Failed to decode invocation, because of $decodingError" log.error(decodingError, "Failed to decode invocation")
log.error(error)
sender() ! Status.Failure(decodingError) sender() ! Status.Failure(decodingError)
} }


Expand Down Expand Up @@ -102,7 +103,7 @@ private[aecor] final class GenericAkkaRuntimeActor[I: KeyDecoder, M[_[_]], F[_]:
case Result(`opId`, value) => case Result(`opId`, value) =>
value match { value match {
case Success((newBehavior, reply)) => case Success((newBehavior, reply)) =>
sender() ! reply sender() ! CommandResult(reply)
become(withBehavior(newBehavior)) become(withBehavior(newBehavior))
case Failure(cause) => case Failure(cause) =>
sender() ! Status.Failure(cause) sender() ! Status.Failure(cause)
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,6 @@
package aecor.runtime.akkageneric.serialization

/**
* Marker trait for all protobuf-serializable messages in `aecor.runtime.akkageneric`.
*/
private[akkageneric] trait Message
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,65 @@
package aecor.runtime.akkageneric.serialization

import java.nio.ByteBuffer

import aecor.runtime.akkageneric.GenericAkkaRuntime.KeyedCommand
import aecor.runtime.akkageneric.GenericAkkaRuntimeActor.{ Command, CommandResult }
import akka.actor.ExtendedActorSystem
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.google.protobuf.ByteString

import scala.collection.immutable.HashMap

class MessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest
with BaseSerializer {

val KeyedCommandManifest = "A"
val CommandManifest = "B"
val CommandResultManifest = "C"

private val fromBinaryMap =
HashMap[String, Array[Byte] AnyRef](
KeyedCommandManifest -> keyedCommandFromBinary,
CommandManifest -> commandFromBinary,
CommandResultManifest -> commandResultFromBinary
)

override def manifest(o: AnyRef): String = o match {
case KeyedCommand(_, _) => KeyedCommandManifest
case Command(_) => CommandManifest
case CommandResult(_) => CommandResultManifest
case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
}

override def toBinary(o: AnyRef): Array[Byte] = o match {
case Command(bytes) =>
bytes.array()
case CommandResult(bytes) =>
bytes.array()
case x @ KeyedCommand(_, _) =>
entityCommandToBinary(x)
case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
}

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinaryMap.get(manifest) match {
case Some(f) => f(bytes)
case other => throw new IllegalArgumentException(s"Unknown manifest [$other]")
}

private def entityCommandToBinary(a: KeyedCommand): Array[Byte] =
msg.KeyedCommand(a.key, ByteString.copyFrom(a.bytes)).toByteArray

private def keyedCommandFromBinary(bytes: Array[Byte]): KeyedCommand =
msg.KeyedCommand.parseFrom(bytes) match {
case msg.KeyedCommand(key, commandBytes) =>
KeyedCommand(key, commandBytes.asReadOnlyByteBuffer)
}

private def commandFromBinary(bytes: Array[Byte]): Command =
Command(ByteBuffer.wrap(bytes))

private def commandResultFromBinary(bytes: Array[Byte]): CommandResult =
CommandResult(ByteBuffer.wrap(bytes))
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package aecor.runtime.akkapersistence.readside
import java.util.UUID import java.util.UUID


import aecor.data.TagConsumer import aecor.data.TagConsumer
import aecor.util.KeyValueStore import aecor.runtime.KeyValueStore
import aecor.util.effect._ import aecor.util.effect._
import akka.persistence.cassandra._ import akka.persistence.cassandra._
import akka.persistence.cassandra.session.scaladsl.CassandraSession import akka.persistence.cassandra.session.scaladsl.CassandraSession
import cats.effect.Effect import cats.effect.Effect
import com.datastax.driver.core.Session import com.datastax.driver.core.Session
import cats.implicits._ import cats.implicits._

import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }


object CassandraOffsetStore { object CassandraOffsetStore {
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,7 +1,7 @@
package aecor.runtime.akkapersistence.readside package aecor.runtime.akkapersistence.readside


import aecor.data.{ Committable, ConsumerId, EventTag, TagConsumer } import aecor.data.{ Committable, ConsumerId, EventTag, TagConsumer }
import aecor.util.KeyValueStore import aecor.runtime.KeyValueStore
import aecor.util.effect._ import aecor.util.effect._
import akka.NotUsed import akka.NotUsed
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,13 +1,26 @@
package aecor.runtime.akkapersistence.readside package aecor.runtime.akkapersistence.readside


import aecor.Has
import aecor.data.{ EntityEvent, EventTag, TagConsumer } import aecor.data.{ EntityEvent, EventTag, TagConsumer }
import aecor.util.KeyValueStore import aecor.runtime.KeyValueStore
import akka.NotUsed import akka.NotUsed
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import cats.effect.Effect import cats.effect.Effect


final case class JournalEntry[O, I, A](offset: O, event: EntityEvent[I, A]) { final case class JournalEntry[O, K, A](offset: O, event: EntityEvent[K, A]) {
def map[B](f: A => B): JournalEntry[O, I, B] = copy(event = event.map(f)) def map[B](f: A => B): JournalEntry[O, K, B] = copy(event = event.map(f))
}

object JournalEntry {
implicit def aecorHasInstanceForEvent[X, O, I, A](
implicit A: Has[EntityEvent[I, A], X]
): Has[JournalEntry[O, I, A], X] =
A.contramap(_.event)

implicit def aecorHasInstanceForOffset[X, O, I, A](
implicit A: Has[O, X]
): Has[JournalEntry[O, I, A], X] = A.contramap(_.offset)

} }


trait JournalQuery[Offset, I, E] { trait JournalQuery[Offset, I, E] {
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,9 +1,9 @@
package aecor.runtime.akkapersistence.readside package aecor.runtime.akkapersistence.readside


import aecor.data.{ EntityEvent, Folded } import aecor.data.{ EntityEvent, Folded }
import aecor.runtime.KeyValueStore
import cats.Monad import cats.Monad
import cats.implicits._ import cats.implicits._
import aecor.util.KeyValueStore


object Projection { object Projection {
final case class Versioned[A](version: Long, a: A) final case class Versioned[A](version: Long, a: A)
Expand Down
44 changes: 44 additions & 0 deletions modules/core/src/main/scala/aecor/Has.scala
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,44 @@
package aecor

trait Has[T, A] {
def get(t: T): A
def contramap[D](f: D => T): Has[D, A] = Has.instance[D](x => get(f(x)))
}

object Has extends TupleInstances {
def apply[A, T](implicit instance: Has[T, A]): Has[T, A] = instance

final class Builder[T] {
@inline def apply[A](f: T => A): Has[T, A] = new Has[T, A] {
override def get(t: T): A = f(t)
}
}
def instance[T]: Builder[T] = new Builder[T]

implicit def refl[A]: Has[A, A] = instance[A](identity)
implicit def hasTuple[A, B, T](implicit TA: Has[T, A], TB: Has[T, B]): Has[T, (A, B)] =
instance[T](x => (TA.get(x), TB.get(x)))

trait HasSyntax {
implicit def toHasSyntaxIdOps[A](a: A): HasSyntaxIdOps[A] = new HasSyntaxIdOps(a)
}

final class HasSyntaxIdOps[T](val a: T) extends AnyVal {
def get[X](implicit T: Has[T, X]): X = T.get(a)
}
object syntax extends HasSyntax
}

trait TupleInstances {
implicit def tuple2HasA[X, A, B](implicit AX: Has[A, X]): Has[(A, B), X] =
Has.instance[(A, B)](x => AX.get(x._1))
implicit def tuple2HasB[X, A, B](implicit BX: Has[B, X]): Has[(A, B), X] =
Has.instance[(A, B)](x => BX.get(x._2))

implicit def tuple3HasA[X, A, B, C](implicit AX: Has[A, X]): Has[(A, B, C), X] =
Has.instance[(A, B, C)](x => AX.get(x._1))
implicit def tuple3HasB[X, A, B, C](implicit BX: Has[B, X]): Has[(A, B, C), X] =
Has.instance[(A, B, C)](x => BX.get(x._2))
implicit def tuple3HasC[X, A, B, C](implicit CX: Has[C, X]): Has[(A, B, C), X] =
Has.instance[(A, B, C)](x => CX.get(x._3))
}
4 changes: 4 additions & 0 deletions modules/core/src/main/scala/aecor/data/Committable.scala
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,6 @@
package aecor.data package aecor.data


import aecor.Has
import cats.implicits._ import cats.implicits._
import cats.{ Applicative, Eval, Functor, Monad, Traverse } import cats.{ Applicative, Eval, Functor, Monad, Traverse }


Expand All @@ -12,6 +13,9 @@ final case class Committable[F[_], +A](commit: F[Unit], value: A) {
} }


object Committable { object Committable {
implicit def aecorHasInstance[F[_], A, B](implicit B: Has[B, A]): Has[Committable[F, B], A] =
B.contramap(_.value)

implicit def catsMonadAndTraversInstance[F[_]: Applicative] implicit def catsMonadAndTraversInstance[F[_]: Applicative]
: Monad[Committable[F, ?]] with Traverse[Committable[F, ?]] = : Monad[Committable[F, ?]] with Traverse[Committable[F, ?]] =
new Monad[Committable[F, ?]] with Traverse[Committable[F, ?]] { new Monad[Committable[F, ?]] with Traverse[Committable[F, ?]] {
Expand Down
9 changes: 9 additions & 0 deletions modules/core/src/main/scala/aecor/data/EntityEvent.scala
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,14 @@
package aecor.data package aecor.data


import aecor.Has

final case class EntityEvent[I, A](entityId: I, sequenceNr: Long, payload: A) { final case class EntityEvent[I, A](entityId: I, sequenceNr: Long, payload: A) {
def map[B](f: A => B): EntityEvent[I, B] = copy(payload = f(payload)) def map[B](f: A => B): EntityEvent[I, B] = copy(payload = f(payload))
} }

object EntityEvent {
implicit def aecorHasInstanceForId[X, I, A](implicit I: Has[I, X]): Has[EntityEvent[I, A], X] =
Has.instance[EntityEvent[I, A]](x => I.get(x.entityId))
implicit def aecorHasInstanceForValue[X, I, A](implicit A: Has[A, X]): Has[EntityEvent[I, A], X] =
Has.instance[EntityEvent[I, A]](x => A.get(x.payload))
}
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,6 @@
package aecor.data package aecor.data


import aecor.IsK import aecor.{ Has, IsK }
import cats.{ Applicative, FlatMap, Id, Monad, ~> } import cats.{ Applicative, FlatMap, Id, Monad, ~> }
import io.aecor.liberator.FunctorK import io.aecor.liberator.FunctorK
import cats.syntax.functor._ import cats.syntax.functor._
Expand Down Expand Up @@ -41,3 +41,9 @@ final case class EventsourcedBehaviorT[M[_[_]], F[_], S, E](actions: M[ActionT[F
} }


final case class Enriched[M, E](metadata: M, event: E) final case class Enriched[M, E](metadata: M, event: E)
object Enriched {
implicit def hasMetadata[M, E, X](implicit M: Has[M, X]): Has[Enriched[M, E], X] =
M.contramap(_.metadata)
implicit def hasEvent[M, E, X](implicit E: Has[E, X]): Has[Enriched[M, E], X] =
E.contramap(_.event)
}
Loading

0 comments on commit eefb1b0

Please sign in to comment.