Permalink
Browse files

Use MonadState instead of Monad (#41)

  • Loading branch information...
notxcain committed Apr 4, 2018
1 parent 96a9600 commit 4e20b19a26485a08eab9bf49034a4b9021339e7c
Showing with 519 additions and 508 deletions.
  1. +1 −0 .sbtopts
  2. +6 −0 README.md
  3. +10 −4 build.sbt
  4. +18 −21 ...s/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/readside/Projection.scala
  5. +1 −1 ...akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/serialization/Message.scala
  6. +16 −0 modules/core/src/main/scala/aecor/IsK.scala
  7. +19 −22 modules/core/src/main/scala/aecor/data/ActionT.scala
  8. +31 −29 modules/core/src/main/scala/aecor/data/EventsourcedBehaviorT.scala
  9. +28 −0 modules/core/src/main/scala/aecor/data/package.scala
  10. +1 −1 modules/core/src/main/scala/aecor/encoding/WireProtocol.scala
  11. +1 −1 modules/core/src/main/scala/aecor/util/{Clock.scala → ClockT.scala}
  12. +2 −2 modules/core/src/main/scala/aecor/util/JavaTimeClock.scala
  13. +2 −2 modules/example/src/main/scala/aecor/example/App.scala
  14. +1 −0 modules/example/src/main/scala/aecor/example/domain/transaction/TransactionAggregate.scala
  15. +2 −2 modules/schedule/src/main/scala/aecor/schedule/DefaultSchedule.scala
  16. +2 −2 modules/schedule/src/main/scala/aecor/schedule/Schedule.scala
  17. +80 −86 modules/test-kit/src/main/scala/aecor/testkit/E2eSupport.scala
  18. +6 −2 modules/test-kit/src/main/scala/aecor/testkit/EventJournal.scala
  19. +14 −35 modules/test-kit/src/main/scala/aecor/testkit/Eventsourced.scala
  20. +22 −0 modules/test-kit/src/main/scala/aecor/testkit/Processable.scala
  21. +13 −20 modules/test-kit/src/main/scala/aecor/testkit/StateClock.scala
  22. +43 −67 modules/test-kit/src/main/scala/aecor/testkit/StateEventJournal.scala
  23. +14 −19 modules/test-kit/src/main/scala/aecor/testkit/StateKeyValueStore.scala
  24. +17 −0 modules/test-kit/src/main/scala/aecor/testkit/testkit.scala
  25. +62 −0 modules/tests/src/main/scala/aecor/tests/TestScheduleEntryRepository.scala
  26. +24 −24 modules/tests/src/main/scala/aecor/tests/e2e/CounterViewRepository.scala
  27. +1 −1 modules/tests/src/main/scala/aecor/tests/e2e/NotificationProcess.scala
  28. +0 −75 modules/tests/src/main/scala/aecor/tests/e2e/TestScheduleEntryRepository.scala
  29. +2 −4 modules/tests/src/main/scala/aecor/tests/e2e/notification.scala
  30. +1 −1 modules/tests/src/test/scala/aecor/tests/AkkaPersistenceRuntimeSpec.scala
  31. +45 −70 modules/tests/src/test/scala/aecor/tests/EndToEndTest.scala
  32. +32 −15 modules/tests/src/test/scala/aecor/tests/GadtTest.scala
  33. +1 −1 modules/tests/src/test/scala/aecor/tests/GenericRuntimeSpec.scala
  34. +1 −1 modules/tests/src/test/scala/aecor/tests/StateRuntimeSpec.scala
View
@@ -0,0 +1 @@
-J-XX:MaxMetaspaceSize=512M
View
@@ -187,3 +187,9 @@ val deploySubscriptions: Task[SubscriptionId => Subscription[Task]] =
Tagging.const[SubscriptionId](EventTag("Subscription"))
)
```
# Projections
```
val journalQuery = runtime.journal
```
View
@@ -19,6 +19,7 @@ lazy val logbackVersion = "1.1.7"
lazy val cassandraDriverExtrasVersion = "3.1.0"
lazy val jsr305Version = "3.0.1"
lazy val boopickleVersion = "1.3.0"
lazy val monocleVersion = "1.5.0-cats"
lazy val monixVersion = "3.0.0-M3"
lazy val scalaCheckVersion = "1.13.4"
@@ -41,11 +42,9 @@ lazy val commonSettings = Seq(
resolvers += "jitpack" at "https://jitpack.io",
scalacOptions ++= commonScalacOptions,
addCompilerPlugin("org.spire-math" %% "kind-projector" % kindProjectorVersion),
addCompilerPlugin("org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.patch),
parallelExecution in Test := false,
scalacOptions in (Compile, doc) := (scalacOptions in (Compile, doc)).value
.filter(_ != "-Xfatal-warnings"),
sources in (Compile, doc) := Nil
.filter(_ != "-Xfatal-warnings")
) ++ warnUnusedImport
lazy val aecorSettings = buildSettings ++ commonSettings ++ publishSettings
@@ -150,6 +149,7 @@ lazy val coreSettings = Seq(
)
lazy val boopickleWireProtocolSettings = Seq(
addCompilerPlugin("org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.patch),
sources in (Compile, doc) := Nil,
scalacOptions in (Compile, console) := Seq(),
libraryDependencies ++= Seq(
@@ -159,6 +159,8 @@ lazy val boopickleWireProtocolSettings = Seq(
)
lazy val scheduleSettings = commonProtobufSettings ++ Seq(
sources in (Compile, doc) := Nil,
addCompilerPlugin("org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.patch),
libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-extras" % cassandraDriverExtrasVersion,
"com.google.code.findbugs" % "jsr305" % jsr305Version % Compile
@@ -190,6 +192,7 @@ lazy val akkaGenericSettings = Seq(
lazy val exampleSettings = {
Seq(
addCompilerPlugin("org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.patch),
resolvers += Resolver.sonatypeRepo("releases"),
libraryDependencies ++=
Seq(
@@ -209,11 +212,14 @@ lazy val exampleSettings = {
lazy val testKitSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-mtl-core" % "0.2.1"
"org.typelevel" %% "cats-mtl-core" % "0.2.3",
"com.github.julien-truffaut" %% "monocle-core" % monocleVersion,
"com.github.julien-truffaut" %% "monocle-macro" % monocleVersion
)
)
lazy val testingSettings = Seq(
addCompilerPlugin("org.scalameta" % "paradise" % scalametaParadiseVersion cross CrossVersion.patch),
libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
@@ -1,43 +1,40 @@
package aecor.runtime.akkapersistence.readside
import aecor.data.{ EntityEvent, Folded }
import cats.MonadError
import cats.Monad
import cats.implicits._
import aecor.util.KeyValueStore
object Projection {
trait ProjectionError[A, E, S] {
def illegalFold(event: E, state: S): A
def missingEvent(event: E, state: S): A
final case class Versioned[A](version: Long, a: A)
trait Failure[F[_], K, E, S] {
def illegalFold[A](event: EntityEvent[K, E], state: Option[S]): F[A]
def missingEvent[A](key: K, seqNr: Long): F[A]
}
def apply[F[_], Err, Key, Event, State](store: Store[F, Key, Versioned[State]],
zero: Event => Folded[State],
update: (Event, State) => Folded[State])(
implicit F: MonadError[F, Err],
Error: ProjectionError[Err, EntityEvent[Key, Event], Option[Versioned[State]]]
def apply[F[_]: Monad, Key, Event, State](
store: KeyValueStore[F, Key, Versioned[State]],
zero: Event => Folded[State],
update: (Event, State) => Folded[State],
failure: Failure[F, Key, Event, State]
): EntityEvent[Key, Event] => F[Unit] = {
case input @ EntityEvent(id, seqNr, event) =>
case input @ EntityEvent(key, seqNr, event) =>
for {
state <- store.readState(id)
state <- store.getValue(key)
currentVersion = state.fold(0L)(_.version)
_ <- if (seqNr <= currentVersion) {
().pure[F]
} else if (seqNr == currentVersion + 1) {
state
.map(_.a)
.fold(zero(event))(update(event, _))
.fold(F.raiseError[Unit](Error.illegalFold(input, state))) { a =>
store.saveState(id, Versioned(currentVersion + 1, a))
.fold(failure.illegalFold[Unit](input, state.map(_.a))) { a =>
store.setValue(key, Versioned(currentVersion + 1, a))
}
} else {
F.raiseError(Error.missingEvent(input, state))
failure.missingEvent(key, currentVersion + 1)
}
} yield ()
}
}
trait Store[F[_], I, S] {
def readState(i: I): F[Option[S]]
def saveState(i: I, s: S): F[Unit]
}
case class Versioned[A](version: Long, a: A)
@@ -3,4 +3,4 @@ package aecor.runtime.akkapersistence.serialization
/**
* Marker trait for all protobuf-serializable messages in `aecor.runtime.akkapersistence`.
*/
trait Message
private[aecor] trait Message
@@ -0,0 +1,16 @@
package aecor
import cats.~>
sealed abstract class IsK[F[_], G[_]] {
def substitute[A](fa: F[A]): G[A]
final def asFunctionK: F ~> G = new (F ~> G) {
final override def apply[A](fa: F[A]): G[A] = substitute(fa)
}
}
object IsK {
implicit def refl[F[_]]: IsK[F, F] = new IsK[F, F] {
final override def substitute[A](fa: F[A]): F[A] = fa
}
}
@@ -1,25 +1,22 @@
package aecor.data
import cats.{ Applicative, Functor, ~> }
import cats.implicits._
final case class Action[S, E, A](run: S => (List[E], A)) {
def liftF[F[_]: Applicative]: ActionT[F, S, E, A] = ActionT.lift(this)
}
object Action {
def read[S, E, A](f: S => A): Action[S, E, A] = Action(s => (List.empty[E], f(s)))
}
import cats.{ Applicative, FlatMap, ~> }
final case class ActionT[F[_], State, Event, Result](run: State => F[(List[Event], Result)])
extends AnyVal {
def mapState[State1](f: State1 => State): ActionT[F, State1, Event, Result] =
final case class ActionT[F[_], S, E, A](run: S => F[(List[E], A)]) extends AnyVal {
def mapState[S1](f: S1 => S): ActionT[F, S1, E, A] =
ActionT(run.compose(f))
def mapK[G[_]](f: F ~> G): ActionT[G, S, E, A] = ActionT { s =>
f(run(s))
}
}
object ActionT {
def lift[F[_]: Applicative, S, E, A](action: Action[S, E, A]): ActionT[F, S, E, A] =
liftK[F, S, E].apply(action)
def mapK[F[_], G[_], S, E](f: F ~> G): ActionT[F, S, E, ?] ~> ActionT[G, S, E, ?] =
new (ActionT[F, S, E, ?] ~> ActionT[G, S, E, ?]) {
override def apply[A](fa: ActionT[F, S, E, A]): ActionT[G, S, E, A] = fa.mapK(f)
}
def liftK[F[_]: Applicative, S, E]: Action[S, E, ?] ~> ActionT[F, S, E, ?] =
new (Action[S, E, ?] ~> ActionT[F, S, E, ?]) {
@@ -29,15 +26,15 @@ object ActionT {
}
}
def enrich[F[_]: Functor, S, E, M](
meta: F[M]
): Action[S, E, ?] ~> ActionT[F, S, Enriched[M, E], ?] =
new (Action[S, E, ?] ~> ActionT[F, S, Enriched[M, E], ?]) {
override def apply[A](action: Action[S, E, A]): ActionT[F, S, Enriched[M, E], A] =
def mapEventsF[F[_]: FlatMap, S, E, E1](
f: List[E] => F[List[E1]]
): ActionT[F, S, E, ?] ~> ActionT[F, S, E1, ?] =
new (ActionT[F, S, E, ?] ~> ActionT[F, S, E1, ?]) {
override def apply[A](action: ActionT[F, S, E, A]): ActionT[F, S, E1, A] =
ActionT { s =>
val (es, a) = action.run(s)
meta.map { m =>
(es.map(Enriched(m, _)), a)
action.run(s).flatMap {
case (es, a) =>
f(es).map((_, a))
}
}
}
@@ -1,41 +1,43 @@
package aecor.data
import cats.{ Applicative, Functor }
import aecor.IsK
import cats.{ Applicative, FlatMap, Id, Monad, ~> }
import io.aecor.liberator.FunctorK
import cats.syntax.functor._
final case class EventsourcedBehaviorT[M[_[_]], F[_], State, Event](
actions: M[ActionT[F, State, Event, ?]],
initialState: State,
applyEvent: (State, Event) => Folded[State]
)
final case class EventsourcedBehaviorT[M[_[_]], F[_], S, E](actions: M[ActionT[F, S, E, ?]],
initialState: S,
applyEvent: (S, E) => Folded[S]) {
def enrich[Env](env: F[Env])(implicit M: FunctorK[M],
F: FlatMap[F]): EventsourcedBehaviorT[M, F, S, Enriched[Env, E]] =
xmapEventsF(es => env.map(en => es.map(ev => Enriched(en, ev))))(_.event)
final case class EventsourcedBehavior[M[_[_]], S, E](actions: M[Action[S, E, ?]],
initialState: S,
applyEvent: (S, E) => Folded[S]) {
def enrich[F[_]: Functor, Env](
fm: F[Env]
)(implicit M: FunctorK[M]): EventsourcedBehaviorT[M, F, S, Enriched[Env, E]] =
def xmapEventsF[E1](
f: List[E] => F[List[E1]]
)(extract: E1 => E)(implicit M: FunctorK[M], F: FlatMap[F]): EventsourcedBehaviorT[M, F, S, E1] =
EventsourcedBehaviorT(
M.mapK(actions, ActionT.enrich(fm)),
M.mapK(actions, ActionT.mapEventsF(f)),
initialState,
(s, e) => applyEvent(s, e.event)
(s, e) => applyEvent(s, extract(e))
)
def lifted[F[_]: Applicative](implicit M: FunctorK[M]): EventsourcedBehaviorT[M, F, S, E] =
EventsourcedBehaviorT(M.mapK(actions, ActionT.liftK[F, S, E]), initialState, applyEvent)
}
def liftEnrich[G[_], Env](fm: G[Env])(
implicit M: FunctorK[M],
G: Monad[G],
F: F IsK Id
): EventsourcedBehaviorT[M, G, S, Enriched[Env, E]] =
lift[G].enrich(fm)
final case class Enriched[M, E](metadata: M, event: E)
def lift[G[_]](implicit G: Applicative[G],
F: IsK[F, Id],
M: FunctorK[M]): EventsourcedBehaviorT[M, G, S, E] =
mapK(new (F ~> G) {
final override def apply[A](fa: F[A]): G[A] =
G.pure(F.substitute(fa))
})
object EventsourcedBehavior {
def optional[M[_[_]], State, Event](
actions: M[Action[Option[State], Event, ?]],
init: Event => Folded[State],
applyEvent: (State, Event) => Folded[State]
): EventsourcedBehavior[M, Option[State], Event] =
EventsourcedBehavior(
actions,
Option.empty[State],
(os, e) => os.map(s => applyEvent(s, e)).getOrElse(init(e)).map(Some(_))
)
def mapK[G[_]](f: F ~> G)(implicit M: FunctorK[M]): EventsourcedBehaviorT[M, G, S, E] =
copy(actions = M.mapK(actions, ActionT.mapK(f)))
}
final case class Enriched[M, E](metadata: M, event: E)
@@ -1,5 +1,7 @@
package aecor
import cats.Id
package object data {
/**
@@ -9,4 +11,30 @@ package object data {
@deprecated("Use Action", "0.16.0")
type Handler[S, E, A] = Action[S, E, A]
type Action[S, E, A] = ActionT[Id, S, E, A]
object Action {
def apply[S, E, A](run: S => (List[E], A)): Action[S, E, A] = ActionT[Id, S, E, A](run)
def read[S, E, A](f: S => A): Action[S, E, A] = Action(s => (List.empty[E], f(s)))
}
type EventsourcedBehavior[M[_[_]], S, E] = EventsourcedBehaviorT[M, Id, S, E]
object EventsourcedBehavior {
def apply[M[_[_]], S, E](actions: M[Action[S, E, ?]],
initialState: S,
applyEvent: (S, E) => Folded[S]): EventsourcedBehavior[M, S, E] =
EventsourcedBehaviorT[M, Id, S, E](actions, initialState, applyEvent)
def optional[M[_[_]], State, Event](
actions: M[Action[Option[State], Event, ?]],
init: Event => Folded[State],
applyEvent: (State, Event) => Folded[State]
): EventsourcedBehavior[M, Option[State], Event] =
EventsourcedBehavior(
actions,
Option.empty[State],
(os, e) => os.map(s => applyEvent(s, e)).getOrElse(init(e)).map(Some(_))
)
}
}
@@ -36,6 +36,7 @@ object WireProtocol {
final case class DecodingFailure(message: String, underlyingException: Option[Throwable] = None)
extends RuntimeException(message, underlyingException.orNull)
type DecodingResult[A] = Either[DecodingFailure, A]
type DecodingResultT[F[_], A] = F[DecodingResult[A]]
object DecodingResult {
def fromTry[A](a: Try[A]): DecodingResult[A] =
a match {
@@ -50,6 +51,5 @@ object WireProtocol {
case Success(value) => Right(value)
}
}
}
}
@@ -4,7 +4,7 @@ import java.time._
import cats.Apply
trait Clock[F[_]] {
trait ClockT[F[_]] {
def zone: F[ZoneId]
def instant: F[Instant]
def zonedDateTime(implicit F: Apply[F]): F[ZonedDateTime] =
@@ -4,12 +4,12 @@ import java.time.{ Instant, ZoneId }
import cats.effect.Sync
class JavaTimeClock[F[_]](underlying: java.time.Clock)(implicit F: Sync[F]) extends Clock[F] {
class JavaTimeClock[F[_]](underlying: java.time.Clock)(implicit F: Sync[F]) extends ClockT[F] {
override def zone: F[ZoneId] = F.delay(underlying.getZone)
override def instant: F[Instant] = F.delay(underlying.instant())
}
object JavaTimeClock {
def apply[F[_]: Sync](underlying: java.time.Clock): Clock[F] =
def apply[F[_]: Sync](underlying: java.time.Clock): ClockT[F] =
new JavaTimeClock[F](underlying)
}
@@ -63,15 +63,15 @@ object App {
runtime
.deploy(
"Transaction",
EventsourcedTransactionAggregate.behavior.enrich(metaProvider),
EventsourcedTransactionAggregate.behavior.liftEnrich(metaProvider),
tagging
)
val deployAccounts: Task[AccountId => Account[Task]] =
runtime
.deploy(
"Account",
EventsourcedAccount.behavior.enrich(metaProvider),
EventsourcedAccount.behavior.liftEnrich(metaProvider),
Tagging.const[AccountId](EventTag("Account"))
)
@@ -9,6 +9,7 @@ final case class TransactionId(value: String) extends AnyVal
final case class From[A](value: A) extends AnyVal
final case class To[A](value: A) extends AnyVal
@boopickleWireProtocol
trait TransactionAggregate[F[_]] {
def create(fromAccountId: From[AccountId], toAccountId: To[AccountId], amount: Amount): F[Unit]
Oops, something went wrong.

0 comments on commit 4e20b19

Please sign in to comment.