Permalink
Browse files

Welcome, Aecor!

  • Loading branch information...
notxcain committed Apr 29, 2016
0 parents commit ffac47357485aa3dc1c95ebcc30386fbd3430be2
Showing with 2,082 additions and 0 deletions.
  1. +81 −0 .gitignore
  2. +21 −0 LICENSE
  3. +15 −0 README.md
  4. +128 −0 build.sbt
  5. +14 −0 core/src/main/protobuf/Messages.proto
  6. 0 core/src/main/resources/reference.conf
  7. +25 −0 core/src/main/scala/aecor/core/CallerDeliveryIdAtLeastOnceDelivery.scala
  8. +67 −0 core/src/main/scala/aecor/core/EventBus.scala
  9. +30 −0 core/src/main/scala/aecor/core/OrderedAtLeastOnceDelivery.scala
  10. +38 −0 core/src/main/scala/aecor/core/concurrent/Deferred.scala
  11. +15 −0 core/src/main/scala/aecor/core/entity/CommandContract.scala
  12. +17 −0 core/src/main/scala/aecor/core/entity/CommandHandler.scala
  13. +14 −0 core/src/main/scala/aecor/core/entity/CommandHandlerResult.scala
  14. +167 −0 core/src/main/scala/aecor/core/entity/EntityActor.scala
  15. +46 −0 core/src/main/scala/aecor/core/entity/EntityActorRegion.scala
  16. +12 −0 core/src/main/scala/aecor/core/entity/EntityName.scala
  17. +32 −0 core/src/main/scala/aecor/core/entity/EntityRef.scala
  18. +11 −0 core/src/main/scala/aecor/core/entity/EventProjector.scala
  19. +20 −0 core/src/main/scala/aecor/core/logging/PersistentActorLogging.scala
  20. +29 −0 core/src/main/scala/aecor/core/message/Correlation.scala
  21. +12 −0 core/src/main/scala/aecor/core/message/Deduplication.scala
  22. +21 −0 core/src/main/scala/aecor/core/message/Message.scala
  23. +34 −0 core/src/main/scala/aecor/core/message/Passivation.scala
  24. +69 −0 core/src/main/scala/aecor/core/process/CompoundConsumerSettings.scala
  25. +159 −0 core/src/main/scala/aecor/core/process/ProcessActor.scala
  26. +37 −0 core/src/main/scala/aecor/core/process/ProcessActorRegion.scala
  27. +49 −0 core/src/main/scala/aecor/core/process/ProcessEventAdapter.scala
  28. +14 −0 core/src/main/scala/aecor/core/serialization/Encoder.scala
  29. +45 −0 core/src/main/scala/aecor/util/FunctionBuilder.scala
  30. +34 −0 example/src/main/protobuf/Account.proto
  31. +117 −0 example/src/main/resources/application.conf
  32. +21 −0 example/src/main/resources/logback.xml
  33. +9 −0 example/src/main/resources/requests/AuthorizePayment.json
  34. +7 −0 example/src/main/resources/requests/CreditAccount.json
  35. +5 −0 example/src/main/resources/requests/OpenAccount.json
  36. +282 −0 example/src/main/scala/aecor/example/App.scala
  37. +100 −0 example/src/main/scala/aecor/example/domain/Account.scala
  38. +8 −0 example/src/main/scala/aecor/example/domain/Amount.scala
  39. +113 −0 example/src/main/scala/aecor/example/domain/CardAuthorization.scala
  40. +64 −0 example/src/main/scala/aecor/example/domain/CardAuthorizationProcess.scala
  41. +61 −0 example/src/main/scala/aecor/example/domain/Counter.scala
  42. +27 −0 haproxy.config
  43. BIN lib/aspectj-1.8.9.jar
  44. BIN lib/libsigar-universal64-macosx.dylib
  45. +1 −0 project/build.properties
  46. +10 −0 project/plugins.sbt
  47. +1 −0 version.sbt
@@ -0,0 +1,81 @@
+#### joe made this: https://goel.io/joe
+
+#####=== JetBrains ===#####
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
+
+*.iml
+
+## Directory-based project format:
+.idea/
+# if you remove the above rule, at least ignore the following:
+
+# User-specific stuff:
+# .idea/workspace.xml
+# .idea/tasks.xml
+# .idea/dictionaries
+
+# Sensitive or high-churn files:
+# .idea/dataSources.ids
+# .idea/dataSources.xml
+# .idea/sqlDataSources.xml
+# .idea/dynamic.xml
+# .idea/uiDesigner.xml
+
+# Gradle:
+# .idea/gradle.xml
+# .idea/libraries
+
+# Mongo Explorer plugin:
+# .idea/mongoSettings.xml
+
+## File-based project format:
+*.ipr
+*.iws
+
+## Plugin-specific files:
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+
+#####=== SBT ===#####
+# Simple Build Tool
+# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control
+
+target/
+lib_managed/
+src_managed/
+project/boot/
+.history
+.cache
+
+#####=== Scala ===#####
+
+*.class
+*.log
+
+# sbt specific
+.cache
+.history
+.lib/
+dist/*
+target/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+
+# Scala-IDE specific
+.scala_dependencies
+.worksheet
+
21 LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2016 Denis Mikhaylov
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
@@ -0,0 +1,15 @@
+# Aecor
+### Scalable, type safe CQRS DDD framework
+
+Aecor is an opinionated framework for building scalable, distributed CQRS DDD services written in Scala. It uses [Akka](https://github.com/akka/akka) as a runtime for pure functional behaviors (`Any => Unit`, not even once) and clustering.
+With the help of [Cats](https://github.com/typelevel/cats/) and [Shapeless](https://github.com/milessabin/shapeless) to reach type safety.
+
+Aecor works on Scala 2.11 with Java 8.
+
+The name `Aecor` (_lat. ocean_) is inspired by a vision of modern distributed applications, as an ocean of messages with pure behaviors floating in it.
+
+Main goals:
+1. Type safety
+2. Pure behaviors
+3. Easy to implement Aggregate Root behaviors
+4. A simple DSL to describe Business Processes
128 build.sbt
@@ -0,0 +1,128 @@
+import com.trueaccord.scalapb.{ScalaPbPlugin => PB}
+
+lazy val buildSettings = Seq(
+ organization := "io.aecor",
+ scalaVersion := "2.11.8"
+)
+
+lazy val commonSettings = Seq(
+ scalacOptions ++= commonScalacOptions,
+ resolvers ++= Seq(
+ Resolver.sonatypeRepo("releases"),
+ Resolver.sonatypeRepo("snapshots"),
+ "Twitter Repository" at "http://maven.twttr.com",
+ "Websudos releases" at "https://dl.bintray.com/websudos/oss-releases/"
+ ),
+ libraryDependencies ++= Seq(
+ "com.github.mpilquist" %% "simulacrum" % "0.7.0",
+ "org.typelevel" %% "machinist" % "0.4.1",
+ compilerPlugin("org.spire-math" %% "kind-projector" % "0.6.3"),
+ compilerPlugin("com.milessabin" % "si2712fix-plugin" % "1.1.0" cross CrossVersion.full)
+ ),
+ parallelExecution in Test := false,
+ scalacOptions in (Compile, doc) := (scalacOptions in (Compile, doc)).value.filter(_ != "-Xfatal-warnings")
+) ++ warnUnusedImport
+
+lazy val scalacheckVersion = "1.13.0"
+
+lazy val aecorSettings = buildSettings ++ commonSettings
+
+lazy val aecor = project.in(file("."))
+ .settings(moduleName := "aecor")
+ .settings(aecorSettings)
+ .aggregate(core, example, tests, bench)
+ .dependsOn(core, example, tests % "test-internal -> test", bench % "compile-internal;test-internal -> test")
+
+lazy val core = project
+ .settings(moduleName := "aecor-core")
+ .settings(aecorSettings:_*)
+ .settings(coreSettings)
+ .settings(libraryDependencies += "org.scalacheck" %% "scalacheck" % scalacheckVersion % "test")
+
+lazy val bench = project.dependsOn(core, example)
+ .settings(moduleName := "aecor-bench")
+ .settings(aecorSettings)
+ .enablePlugins(JmhPlugin)
+
+lazy val tests = project.dependsOn(core, example)
+ .settings(moduleName := "aecor-tests")
+ .settings(aecorSettings)
+ .settings(testingDependencies: _*)
+
+lazy val example = project.dependsOn(core)
+ .settings(moduleName := "aecor-example")
+ .settings(aecorSettings)
+
+val circeVersion = "0.4.1"
+val akkaVersion = "2.4.7"
+val akkaStreamKafka = "0.11-M3"
+val akkaPersistenceCassandra = "0.15"
+val catsVersion = "0.5.0"
+val akkaHttpJson = "1.6.0"
+val phantomVersion = "1.25.4"
+val kamonVersion = "0.6.1"
+
+lazy val coreSettings = Seq(
+ libraryDependencies ++= Seq(
+ "com.typesafe.akka" %% "akka-http-experimental",
+ "com.typesafe.akka" %% "akka-cluster-sharding",
+ "com.typesafe.akka" %% "akka-persistence",
+ "com.typesafe.akka" %% "akka-slf4j",
+ "com.typesafe.akka" %% "akka-contrib",
+ "com.typesafe.akka" %% "akka-persistence-query-experimental"
+ ).map(_ % akkaVersion),
+ libraryDependencies ++= Seq(
+ "com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandra,
+ "com.typesafe.akka" %% "akka-stream-kafka" % akkaStreamKafka,
+ "com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.1",
+ "org.fusesource" % "sigar" % "1.6.4",
+ "ch.qos.logback" % "logback-classic" % "1.1.7"
+ ),
+ libraryDependencies ++= Seq(
+ "io.kamon" %% "kamon-core",
+ "io.kamon" %% "kamon-jmx",
+ "io.kamon" %% "kamon-akka",
+ "io.kamon" %% "kamon-akka-remote_akka-2.4",
+ "io.kamon" %% "kamon-autoweave"
+ ).map(_ % kamonVersion),
+ libraryDependencies ++= Seq(
+ "io.circe" %% "circe-core",
+ "io.circe" %% "circe-generic",
+ "io.circe" %% "circe-parser"
+ ).map(_ % circeVersion),
+ libraryDependencies += "org.typelevel" %% "cats" % catsVersion,
+ libraryDependencies += "de.heikoseeberger" %% "akka-http-circe" % akkaHttpJson,
+ libraryDependencies += "com.websudos" %% "phantom-dsl" % phantomVersion,
+ PB.flatPackage in PB.protobufConfig := true
+) ++ PB.protobufSettings
+
+lazy val testingDependencies = Seq(
+ libraryDependencies += "org.scalacheck" %% "scalacheck" % scalacheckVersion,
+ libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.0-RC1" % "test",
+ libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
+)
+
+lazy val commonScalacOptions = Seq(
+ "-deprecation",
+ "-encoding", "UTF-8",
+ "-feature",
+ "-language:existentials",
+ "-language:higherKinds",
+ "-language:implicitConversions",
+ "-language:experimental.macros",
+ "-unchecked",
+ "-Xfatal-warnings",
+ "-Xlint",
+ "-Yinline-warnings",
+ "-Yno-adapted-args",
+ "-Ywarn-dead-code",
+ "-Ywarn-numeric-widen",
+ "-Ywarn-value-discard",
+ "-Ywarn-unused-import",
+ "-Xfuture"
+)
+
+lazy val warnUnusedImport = Seq(
+ scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)},
+ scalacOptions in (Test, console) <<= (scalacOptions in (Compile, console))
+)
@@ -0,0 +1,14 @@
+option java_package = "io.aecor.message.protobuf";
+option optimize_for = SPEED;
+
+message DomainEvent {
+ required string id = 1;
+ required bytes payload = 4;
+ required Timestamp timestamp = 5;
+ required string causedBy = 6;
+}
+
+message Timestamp {
+ //UTC timestamp
+ required uint64 millis = 1;
+}
No changes.
@@ -0,0 +1,25 @@
+package aecor.core
+
+import akka.actor.ActorPath
+import akka.persistence.AtLeastOnceDelivery
+
+trait CallerDeliveryIdAtLeastOnceDelivery { self: AtLeastOnceDelivery =>
+ private val anyToDeliveryId = scala.collection.mutable.Map.empty[Any, Long]
+
+ def deliver[T](destination: ActorPath, message: T)(f: T => Any): Unit = {
+ deliver(destination) { deliveryId =>
+ val deliveryAck = f(message)
+ anyToDeliveryId += deliveryAck -> deliveryId
+ message
+ }
+ }
+
+ def confirmDelivery(ack: Any): Boolean = {
+ anyToDeliveryId.get(ack) match {
+ case Some(deliveryId) =>
+ confirmDelivery(deliveryId)
+ case None =>
+ false
+ }
+ }
+}
@@ -0,0 +1,67 @@
+package aecor.core
+
+import aecor.core.EventBus.PublishMessage
+import akka.actor.{Actor, ActorLogging, ActorRef, Stash}
+import akka.kafka.ProducerSettings
+import akka.kafka.scaladsl.Producer
+import akka.pattern.pipe
+import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
+import akka.stream._
+import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
+import io.aecor.message.protobuf.Messages.DomainEvent
+import org.apache.kafka.clients.producer.ProducerRecord
+
+object EventBus {
+ case class PublishMessage[Reply](topic: String, key: String, message: DomainEvent, replyWith: Reply, replyTo: ActorRef)
+}
+
+class EventBus(producerSettings: ProducerSettings[String, DomainEvent], bufferSize: Int) extends Actor with ActorLogging with Stash {
+ implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system).withSupervisionStrategy(_ => Supervision.Restart))
+ import materializer._
+
+ type Queue = SourceQueueWithComplete[Producer.Message[String, DomainEvent, (ActorRef, Any)]]
+
+ def offering(queue: Queue): Receive = {
+ case PublishMessage(topic, key, message, passThrough, replyTo) =>
+ val record = new ProducerRecord(topic, key, message)
+ val producerMessage = Producer.Message(record, replyTo -> passThrough)
+ queue.offer(producerMessage).pipeTo(self)
+ context.become(waitingForOfferResult(producerMessage, queue))
+ }
+ def waitingForOfferResult(producerMessage: Producer.Message[String, DomainEvent, (ActorRef, Any)], queue: Queue): Receive = {
+ case qor: QueueOfferResult => qor match {
+ case Enqueued =>
+ unstashAll()
+ context.become(offering(queue))
+ case Dropped =>
+ log.warning("Queue dropped offered message")
+ unstashAll()
+ context.become(offering(queue))
+ case Failure(cause) =>
+ log.error(cause, "Offering failed")
+ log.info("Recreating queue")
+ val newQueue = createQueue
+ newQueue.offer(producerMessage).pipeTo(self)
+ context.become(waitingForOfferResult(producerMessage, newQueue))
+ case QueueClosed =>
+ log.info("Queue closed")
+ log.info("Recreating queue")
+ val newQueue = createQueue
+ newQueue.offer(producerMessage).pipeTo(self)
+ context.become(waitingForOfferResult(producerMessage, newQueue))
+ }
+ case _ => stash()
+ }
+
+ def createQueue = Source
+ .queue[Producer.Message[String, DomainEvent, (ActorRef, Any)]](bufferSize, OverflowStrategy.dropNew)
+ .viaMat(Producer.flow(producerSettings))(Keep.left)
+ .map(_.message.passThrough)
+ .to(Sink.foreach {
+ case (replyTo, replyWith) => replyTo ! replyWith
+ })
+ .run
+
+
+ override def receive: Receive = offering(createQueue)
+}
@@ -0,0 +1,30 @@
+package aecor.core
+
+import akka.actor.ActorPath
+import akka.persistence.AtLeastOnceDelivery
+
+trait OrderedAtLeastOnceDelivery extends AtLeastOnceDelivery {
+ type DeliveryId = Long
+
+ private case class Delivery(destination: ActorPath, deliveryIdToMessage: (DeliveryId) => Any)
+
+ private val deliveryQueue = scala.collection.mutable.Queue.empty[Delivery]
+
+ override def deliver(destination: ActorPath)(deliveryIdToMessage: (DeliveryId) => Any): Unit = {
+ if (super.numberOfUnconfirmed == 0) {
+ super.deliver(destination)(deliveryIdToMessage)
+ } else {
+ deliveryQueue.enqueue(Delivery(destination, deliveryIdToMessage))
+ }
+ }
+
+ override def numberOfUnconfirmed: Int = deliveryQueue.length + super.numberOfUnconfirmed
+
+ override def confirmDelivery(deliveryId: DeliveryId): Boolean = {
+ if (deliveryQueue.nonEmpty) {
+ val Delivery(destination, deliveryIdToMessage) = deliveryQueue.dequeue()
+ super.deliver(destination)(deliveryIdToMessage)
+ }
+ super.confirmDelivery(deliveryId)
+ }
+}
Oops, something went wrong.

0 comments on commit ffac473

Please sign in to comment.