Skip to content

Commit

Permalink
Rename, refactor, break things
Browse files Browse the repository at this point in the history
  • Loading branch information
notxcain committed Jul 20, 2016
1 parent 0893999 commit b2c1001
Show file tree
Hide file tree
Showing 22 changed files with 132 additions and 62 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ project/plugins/project/

# Ensime
.ensime
.ensime_cache/
13 changes: 7 additions & 6 deletions core/src/main/protobuf/Messages.proto
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package aecor.core.serialization.protobuf;
option optimize_for = SPEED;

message EntityEventEnvelope {
required string entityId = 1;
required uint64 sequenceNr = 2;
required bytes event = 3;
required uint64 timestamp = 4; //UTC millis
required string causedBy = 5;
message ExternalEntityEventEnvelope {
required string id = 1;
required string entityId = 2;
required uint64 sequenceNr = 3;
required bytes event = 4;
required uint64 timestamp = 5; //UTC millis
}

message PersistentEntityEventEnvelope {
required string id = 1;
required uint32 serializerId = 2;
required string manifest = 3;
required bytes payload = 4;
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ cassandra-query-journal {
aecor {
entity {
number-of-shards = 30
ask-timeout = 60s
default-idle-timeout = 120s
idle-timeout {
# per entity name idle timeout here
}
}
process {
number-of-shards = 30
Expand All @@ -19,14 +24,14 @@ aecor {
akka {
actor {
serialization-identifiers {
"aecor.core.entity.serialization.persistence.EntityActorEventSerializer" = 100
"aecor.core.entity.serialization.persistence.PersistentEntityEventEnvelopeSerializer" = 100
}
serializers {
entity-actor-event = "aecor.core.entity.serialization.persistence.EntityActorEventSerializer"
persistent-entity-event-envelope = "aecor.core.entity.serialization.persistence.PersistentEntityEventEnvelopeSerializer"
}
serialization-bindings {
"java.io.Serializable" = none
"aecor.core.entity.EntityActorEvent" = entity-actor-event
"aecor.core.entity.PersistentEntityEventEnvelope" = persistent-entity-event-envelope
}
}
persistence {
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/aecor/core/entity/EntityActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private [aecor] case class EntityActorState[State](entityState: State, processed
case _ => None
}

def hasProcessedCommandWithId(messageId: MessageId): Boolean = processedCommands(messageId)
def shouldHandleCommand(messageId: MessageId): Boolean = processedCommands(messageId)

def applyEntityEvent[Event](projector: EventProjector[State, Event])(event: Event, causedBy: MessageId): EntityActorState[State] =
copy(entityState = projector(entityState, event), processedCommands = processedCommands + causedBy)
Expand Down Expand Up @@ -105,7 +105,7 @@ private [aecor] class EntityActor[State: ClassTag, Command: ClassTag, Event: Cla
def receiveCommandMessage: Receive = {
case m @ Message(id, command: Command, ack) =>
log.debug("Received command message [{}]", m)
if (state.hasProcessedCommandWithId(id)) {
if (state.shouldHandleCommand(id)) {
sender() ! EntityResponse(ack, Accepted)
log.debug("Message already processed")
} else {
Expand All @@ -120,7 +120,7 @@ private [aecor] class EntityActor[State: ClassTag, Command: ClassTag, Event: Cla
def runResult(causedBy: MessageId, ack: Any)(result: CommandHandlerResult[Rejection, Event]): Unit = result match {
case Accept(events) =>
val envelopes = events.map { event =>
Tagged(PersistentEntityEventEnvelope(event, Instant.now, causedBy), Set(entityName))
Tagged(PersistentEntityEventEnvelope(MessageId.generate, event, Instant.now, causedBy), Set(entityName))
}.toList
persistAll(envelopes) {
case Tagged(e: PersistentEntityEventEnvelope[Event], _) =>
Expand All @@ -129,22 +129,21 @@ private [aecor] class EntityActor[State: ClassTag, Command: ClassTag, Event: Cla
deferAsync(NotUsed) { _ =>
sender() ! EntityResponse(ack, Accepted)
}
context.become(receiveCommand)
case Reject(rejection) =>
sender() ! EntityResponse(ack, Rejected(rejection))
context.become(receiveCommand)
case Defer(deferred) =>
deferred.map(HandleCommandHandlerResult).asFuture.pipeTo(self)(sender)
context.become {
case HandleCommandHandlerResult(deferredResult) =>
log.debug("Command handler result [{}]", deferredResult)
runResult(causedBy, ack)(deferredResult)
unstashAll()
context.become(receiveCommand)
case failure @ Status.Failure(e) =>
log.error(e, "Deferred reaction failed")
sender() ! failure
context.become(receiveCommand)
unstashAll()
context.become(receiveCommand)
case _ =>
stash()
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/aecor/core/entity/EntityRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package aecor.core.entity
import aecor.core.message.{Message, MessageId}
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern._
import akka.util.Timeout

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.reflect.ClassTag

abstract class EntityRef[Entity] {
private [aecor] def actorRef: ActorRef
def handle[Command](idempotencyKey: String, command: Command)(implicit ec: ExecutionContext, timeout: Timeout, contract: CommandContract[Entity, Command]): Future[Result[contract.Rejection]]
def handle[Command](idempotencyKey: String, command: Command)(implicit contract: CommandContract[Entity, Command]): Future[Result[contract.Rejection]]
}

class RemoteEntityRef[C: ClassTag, R](handler: (MessageId, C) => Future[Result[R]])(implicit actorSystem: ActorSystem) {
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/aecor/core/entity/EntityShardRegion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardReg
import akka.pattern._
import akka.util.Timeout

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.concurrent.duration._

object EntityShardRegion {
def apply(actorSystem: ActorSystem): EntityShardRegion = new EntityShardRegion(actorSystem)
Expand All @@ -29,14 +28,14 @@ class EntityShardRegion(actorSystem: ActorSystem) {
entityName: EntityName[Entity]
): EntityRef[Entity] = {

val config = actorSystem.settings.config
val config = new EntityShardRegionConfig(actorSystem.settings.config.getConfig("aecor.entity"))

val numberOfShards = config.getInt("aecor.entity.number-of-shards")

def extractEntityId: ShardRegion.ExtractEntityId = {
case m @ Message(_, c: Command, _) (correlation(c), m)
}

val numberOfShards = config.numberOfShards
def extractShardId: ShardRegion.ExtractShardId = {
case m @ Message(_, c: Command, _) => ExtractShardId(correlation(c), numberOfShards)
}
Expand All @@ -46,7 +45,7 @@ class EntityShardRegion(actorSystem: ActorSystem) {
behavior.initialState(entity),
behavior.commandHandler(entity),
behavior.eventProjector(entity),
2.minutes
config.idleTimeout(entityName.value)
)

val shardRegionRef = ClusterSharding(actorSystem).start(
Expand All @@ -58,10 +57,13 @@ class EntityShardRegion(actorSystem: ActorSystem) {
)

new EntityRef[Entity] {
implicit val askTimeout = Timeout(config.askTimeout)
import actorSystem.dispatcher

override private[aecor] val actorRef: ActorRef = shardRegionRef

override def handle[C](id: String, command: C)(implicit ec: ExecutionContext, timeout: Timeout, contract: CommandContract[Entity, C]): Future[Result[contract.Rejection]] =
(actorRef ? Message(MessageId(id), command, NotUsed)).mapTo[EntityResponse[contract.Rejection, NotUsed]].map(_.result)
override def handle[C](idempotencyKey: String, command: C)(implicit contract: CommandContract[Entity, C]): Future[Result[contract.Rejection]] =
(actorRef ? Message(MessageId(idempotencyKey), command, NotUsed)).mapTo[EntityResponse[contract.Rejection, NotUsed]].map(_.result)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package aecor.core.entity

import aecor.util.ConfigHelpers._
import com.typesafe.config.Config

import scala.concurrent.duration.FiniteDuration

class EntityShardRegionConfig(config: Config) {
val numberOfShards: Int = config.getInt("number-of-shards")
val askTimeout: FiniteDuration = config.getMillisDuration("ask-timeout")
val defaultIdleTimeout: FiniteDuration = config.getMillisDuration("default-idle-timeout")
def idleTimeout(entityName: String): FiniteDuration = {
val key = s"idle-timeout.$entityName"
if (config.hasPath(key)) config.getMillisDuration(key) else defaultIdleTimeout
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import aecor.core.message.MessageId

import scala.reflect.ClassTag

case class PersistentEntityEventEnvelope[+Event](event: Event, timestamp: Instant, causedBy: MessageId) {
case class PersistentEntityEventEnvelope[+Event](id: MessageId, event: Event, timestamp: Instant, causedBy: MessageId) {
def cast[EE: ClassTag]: Option[PersistentEntityEventEnvelope[EE]] = event match {
case e: EE => Some(this.asInstanceOf[PersistentEntityEventEnvelope[EE]])
case other => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.google.protobuf.ByteString

import scala.util.Try

class EntityActorEventCodec(actorSystem: ExtendedActorSystem) extends Codec[PersistentEntityEventEnvelope[AnyRef]] {
class PersistentEntityEventEnvelopeCodec(actorSystem: ExtendedActorSystem) extends Codec[PersistentEntityEventEnvelope[AnyRef]] {

lazy val serialization = SerializationExtension(actorSystem)

Expand All @@ -23,7 +23,7 @@ class EntityActorEventCodec(actorSystem: ExtendedActorSystem) extends Codec[Pers
override def decode(bytes: Array[Byte], manifest: String): Try[PersistentEntityEventEnvelope[AnyRef]] =
pb.PersistentEntityEventEnvelope.validate(bytes).flatMap { dto =>
serialization.deserialize(dto.payload.toByteArray, dto.serializerId, dto.manifest).map { event =>
PersistentEntityEventEnvelope(event, Instant.ofEpochMilli(dto.timestamp), MessageId(dto.causedBy))
PersistentEntityEventEnvelope(MessageId(dto.id), event, Instant.ofEpochMilli(dto.timestamp), MessageId(dto.causedBy))
}
}

Expand All @@ -37,8 +37,9 @@ class EntityActorEventCodec(actorSystem: ExtendedActorSystem) extends Codec[Pers
if (serializer.includeManifest) event.getClass.getName
else PersistentRepr.Undefined
}
pb.PersistentEntityEventEnvelope(serializer.identifier, serManifest, ByteString.copyFrom(serializer.toBinary(event)), timestamp.toEpochMilli, causedBy.value).toByteArray
pb.PersistentEntityEventEnvelope(e.id.value, serializer.identifier, serManifest, ByteString.copyFrom(serializer.toBinary(event)), timestamp.toEpochMilli, causedBy.value).toByteArray
}
}

class EntityActorEventSerializer(actorSystem: ExtendedActorSystem) extends CodecSerializer[PersistentEntityEventEnvelope[AnyRef]](actorSystem, new EntityActorEventCodec(actorSystem))
class PersistentEntityEventEnvelopeSerializer(actorSystem: ExtendedActorSystem)
extends CodecSerializer[PersistentEntityEventEnvelope[AnyRef]](actorSystem, new PersistentEntityEventEnvelopeCodec(actorSystem))
4 changes: 2 additions & 2 deletions core/src/main/scala/aecor/core/process/ProcessSharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import aecor.core.message._
import aecor.core.process.ProcessActor.ProcessBehavior
import aecor.core.process.ProcessSharding.{Control, TopicName}
import aecor.core.serialization.PureDeserializer
import aecor.core.serialization.protobuf.EntityEventEnvelope
import aecor.core.serialization.protobuf.ExternalEntityEventEnvelope
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props, Terminated}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.kafka.ConsumerMessage.{Committable, CommittableMessage}
Expand Down Expand Up @@ -39,7 +39,7 @@ case class ProcessInputEnvelope[Input](eventId: MessageId, input: Input)

class ProcessInputDeserializer[Input](config: Map[TopicName, (Array[Byte] => Option[Input])]) extends PureDeserializer[Option[ProcessInputEnvelope[Input]]] {
override def deserialize(topic: TopicName, data: Array[Byte]): Option[ProcessInputEnvelope[Input]] = {
val envelope = EntityEventEnvelope.parseFrom(data)
val envelope = ExternalEntityEventEnvelope.parseFrom(data)
config.get(topic).flatMap(f => f(envelope.event.toByteArray)).map { input =>
ProcessInputEnvelope(MessageId(s"${envelope.entityId}#${envelope.sequenceNr}"), input)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private [aecor] class ScheduleActor(scheduleName: String, timeBucket: String)

def putInEnvelope(event: ScheduleActorEvent): PersistentEntityEventEnvelope[ScheduleActorEvent] =
PersistentEntityEventEnvelope(
MessageId.generate,
event,
Instant.now(),
MessageId(s"time-${LocalDateTime.now()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package aecor.core.serialization

import java.util

import aecor.core.serialization.protobuf.EntityEventEnvelope
import aecor.core.serialization.protobuf.ExternalEntityEventEnvelope
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

trait PureDeserializer[A] extends Deserializer[A] {
final override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
final override def close(): Unit = ()
}

class EntityEventEnvelopeSerde extends Serializer[EntityEventEnvelope] with PureDeserializer[(String, EntityEventEnvelope)] {
override def serialize(topic: String, data: EntityEventEnvelope): Array[Byte] =
class EntityEventEnvelopeSerde extends Serializer[ExternalEntityEventEnvelope] with PureDeserializer[(String, ExternalEntityEventEnvelope)] {
override def serialize(topic: String, data: ExternalEntityEventEnvelope): Array[Byte] =
data.toByteArray

override def deserialize(topic: String, data: Array[Byte]): (String, EntityEventEnvelope) =
(topic, EntityEventEnvelope.parseFrom(data))
override def deserialize(topic: String, data: Array[Byte]): (String, ExternalEntityEventEnvelope) =
(topic, ExternalEntityEventEnvelope.parseFrom(data))
}
6 changes: 3 additions & 3 deletions core/src/main/scala/aecor/core/streaming/Replicator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag


case class JournalEntry[A](persistenceId: String, sequenceNr: Long, event: A, timestamp: Instant, causedBy: MessageId)
case class JournalEntry[A](persistenceId: String, sequenceNr: Long, eventId: MessageId, event: A, timestamp: Instant, causedBy: MessageId)

class Replicator(extendedCassandraReadJournal: ExtendedCassandraReadJournal) {

Expand All @@ -23,8 +23,8 @@ class Replicator(extendedCassandraReadJournal: ExtendedCassandraReadJournal) {
def committableEventSourceFor[A] = new MkCommittable[A] {
override def apply[E](consumerId: String)(implicit name: EntityName[A], contract: EventContract.Aux[A, E], E: ClassTag[E], ec: ExecutionContext): Source[CommittableMessage[JournalEntry[E]], NotUsed] =
extendedCassandraReadJournal.committableEventsByTag(name.value, consumerId).collect {
case CommittableMessage(committable, UUIDEventEnvelope(_, pid, sequenceNr, PersistentEntityEventEnvelope(event: E, timestamp, causedBy))) =>
CommittableMessage(committable, JournalEntry(pid, sequenceNr, event, timestamp, causedBy))
case CommittableMessage(committable, UUIDEventEnvelope(_, persistenceId, sequenceNr, PersistentEntityEventEnvelope(id, event: E, timestamp, causedBy))) =>
CommittableMessage(committable, JournalEntry(persistenceId, sequenceNr, id, event, timestamp, causedBy))
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/aecor/util/ConfigHelpers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package aecor.util

import java.util.concurrent.TimeUnit

import com.typesafe.config.Config

import scala.concurrent.duration.{Duration, FiniteDuration}

object ConfigHelpers {
final implicit class ConfigOps(val config: Config) extends AnyVal {
def getMillisDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.MILLISECONDS)

def getNanosDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.NANOSECONDS)

private def getDuration(path: String, unit: TimeUnit): FiniteDuration =
Duration(config.getDuration(path, unit), unit)
}
}
4 changes: 3 additions & 1 deletion example/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ akka {
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
"scala.Serializable" = kryo
"aecor.example.domain.Account$Event" = kryo
"aecor.example.domain.CardAuthorization$Event" = kryo
"java.io.Serializable" = none
}
}
persistence {
Expand Down
2 changes: 1 addition & 1 deletion example/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
<logger name="org.apache.kafka.clients.consumer.internals.AbstractCoordinator" level="WARN" />
<logger name="com.datastax.driver.core.Connection" level="WARN" />
<logger name="com.websudos.phantom" level="WARN" />
<logger name="akka.persistence.cassandra.query.EventsByTagPublisher" level="DEBUG" />
<logger name="akka.persistence.cassandra.query.EventsByTagPublisher" level="INFO" />
</configuration>
2 changes: 1 addition & 1 deletion example/src/main/resources/requests/AuthorizePayment.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"AuthorizePayment": {
"cardAuthorizationId": "14",
"cardAuthorizationId": "1",
"accountId": "DMITRY",
"amount": 5000,
"acquireId": 1,
Expand Down
2 changes: 1 addition & 1 deletion example/src/main/scala/aecor/example/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object App extends App {
Kamon.start()
val config = ConfigFactory.load()
val actorSystem = ActorSystem(config.getString("cluster.system-name"))
actorSystem.actorOf(RootActor.props, "root")
actorSystem.actorOf(AppActor.props, "root")
actorSystem.registerOnTermination {
System.exit(1)
}
Expand Down
Loading

0 comments on commit b2c1001

Please sign in to comment.