diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..48176e9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,19 @@ +sudo: false +dist: trusty +language: scala + +script: + - sbt test + +cache: + directories: + - $HOME/.ivy2/cache + - $HOME/.sbt/boot/ + +before_cache: + # Tricks to avoid unnecessary cache updates + - find $HOME/.ivy2 -name "ivydata-*.properties" -delete + - find $HOME/.sbt -name "*.lock" -delete + +jdk: + - openjdk8 diff --git a/build.sbt b/build.sbt index 45d26f1..f4675ea 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,18 @@ name := "akka-typed-session" version := "0.1.0-SNAPSHOT" organization := "com.rolandkuhn" -scalaVersion := "2.12.1" +scalaVersion := "2.12.2" + +scalacOptions += "-deprecation" +logBuffered in Test := false + +val akkaVersion = "2.5.99-TYPED-M1" +resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" libraryDependencies ++= Seq( "com.chuusai" %% "shapeless" % "2.3.2", - "com.typesafe.akka" %% "akka-typed-experimental" % "2.5-M1" + "com.typesafe.akka" %% "akka-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-typed-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % "3.0.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" ) diff --git a/old/ScalaProcess2.scala b/old/ScalaProcess2.scala deleted file mode 100644 index 0dd7feb..0000000 --- a/old/ScalaProcess2.scala +++ /dev/null @@ -1,877 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package com.rolandkuhn.akka_typed_session - -import akka.typed._ -import scala.concurrent.duration._ -import akka.{ actor ⇒ a } -import scala.util.control.NoStackTrace -import scala.annotation.implicitNotFound - -/** - * A DSL for writing reusable behavior pieces that are executed concurrently - * within Actors. - * - * Terminology: - * - * - a Process has a 1:1 relationship with an ActorRef - * - an Operation is a step that a Process takes and that produces a value - * - Processes are concurrent, but not distributed: all failures stop the entire Actor - * - each Process has its own identity (due to ActorRef), and the Actor has its own - * identity (an ActorRef[ActorCmd[_]]); processSelf is the Process’ identity, actorSelf is the Actor’s - * - process timeout means failure - * - every Actor has a KV store for state - * - * - querying by key (using a single element per slot) - * - updating is an Operation that produces events that are applied to the state - * - persistence can be plugged in transparently (NOT YET IMPLEMENTED) - * - recovery means acquiring state initially (which might trigger internal replay) - */ -object ScalaProcess2 { - - /** - * Exception type that is thrown by the `retry` facility when after the - * given number of retries still no value has been obtained. - */ - final class RetriesExceeded(message: String) extends RuntimeException(message) with NoStackTrace - - import language.implicitConversions - /** - * This implicit expresses that operations that do not use their input channel can be used in any context. - */ - private implicit def nothingIsSomething[T, U, E <: E1, E1 <: Effects](op: Operation[Nothing, T, E]): Operation[U, T, E1] = - op.asInstanceOf[Operation[U, T, E1]] - - /** - * This is a compile-time marker for the type of self-reference expected by - * the process that is being described. No methods can be called on a value - * of this type. It is used as follows: - * - * {{{ - * OpDSL[MyType] { implicit opDSL => - * ... // use Operation operators here - * } - * }}} - */ - sealed trait OpDSL extends Any { - type Self - } - - /** - * This object offers different constructors that provide a scope within - * which [[Operation]] values can be created using the `op*` methods. The - * common characteristic of these constructors is that they lift their - * contents completely into the resulting process description, in other - * words the code within is only evaluated once the [[Operation]] has been - * called, forked, or spawned within an Actor. - * - * It is strongly recommended to always use the same name for the required - * implicit function argument (`opDSL` in the examples below) in order to - * achieve proper scoping for nested declarations. - * - * Usage for single-shot processes: - * {{{ - * OpDSL[MyType] { implicit opDSL => - * for { - * x <- step1 - * y <- step2 - * ... - * } ... - * } - * }}} - * - * Usage for bounded repetition (will run the whole process three times - * in this example and yield a list of the three results): - * {{{ - * OpDSL.loop[MyType](3) { implicit opDSL => - * for { - * x <- step1 - * y <- step2 - * ... - * } ... - * } - * }}} - * - * Usage for infinite repetition, for example when writing a server process: - * {{{ - * OpDSL.loopInf[MyType] { implicit opDSL => - * for { - * x <- step1 - * y <- step2 - * ... - * } ... - * } - * }}} - */ - object OpDSL { - private val _unit: Operation[Nothing, Null, _0] = opUnit(null)(null: OpDSL { type Self = Nothing }) - private def unit[S, Out]: Operation[S, Out, _0] = _unit.asInstanceOf[Operation[S, Out, _0]] - - def loopInf[S]: NextLoopInf[S] = nextLoopInf.asInstanceOf[NextLoopInf[S]] - trait NextLoopInf[S] { - def apply[U, E <: Effects](body: OpDSL { type Self = S } ⇒ Operation[S, U, E]): Operation[S, Nothing, Loop[E]] = { - lazy val l: Operation[S, Nothing, E] = unit[S, OpDSL { type Self = S }].flatMap(body).withEffects[_0].flatMap(_ ⇒ l) - l.withEffects[Loop[E]] - } - } - private object nextLoopInf extends NextLoopInf[Nothing] - - def apply[T]: Next[T] = next.asInstanceOf[Next[T]] - trait Next[T] { - def apply[U, E <: Effects](body: OpDSL { type Self = T } ⇒ Operation[T, U, E]): Operation[T, U, E] = - unit[T, OpDSL { type Self = T }].flatMap(body) - } - private object next extends Next[Nothing] - - trait NextStep[T] { - def apply[U, E <: Effects](mailboxCapacity: Int, body: OpDSL { type Self = T } ⇒ Operation[T, U, E])( - implicit opDSL: OpDSL): Operation[opDSL.Self, U, E] = - Call(Process("nextStep", Duration.Inf, mailboxCapacity, body(null)), None) - } - object nextStep extends NextStep[Nothing] - } - - /** - * Helper to make `Operation.map` or `Operation.foreach` behave like `flatMap` when needed. - */ - sealed trait MapAdapter[Self, Out, Mapped, EOut <: Effects] { - def lift[O](f: O ⇒ Out): O ⇒ Operation[Self, Mapped, EOut] - } - /** - * Helper to make `Operation.map` or `Operation.foreach` behave like `flatMap` when needed. - */ - object MapAdapter extends MapAdapterLow { - private val _adapter = - new MapAdapter[Any, Operation[Any, Any, _0], Any, _0] { - override def lift[O](f: O ⇒ Operation[Any, Any, _0]): O ⇒ Operation[Any, Any, _0] = f - } - - implicit def mapAdapterOperation[Self, M, E <: Effects]: MapAdapter[Self, Operation[Self, M, E], M, E] = - _adapter.asInstanceOf[MapAdapter[Self, Operation[Self, M, E], M, E]] - } - /** - * Helper to make `Operation.map` or `Operation.foreach` behave like `flatMap` when needed. - */ - trait MapAdapterLow { - private val _adapter = - new MapAdapter[Any, Any, Any, _0] { - override def lift[O](f: O ⇒ Any): O ⇒ Operation[Any, Any, _0] = o ⇒ Return(f(o)) - } - - implicit def mapAdapterAny[Self, Out]: MapAdapter[Self, Out, Out, _0] = - _adapter.asInstanceOf[MapAdapter[Self, Out, Out, _0]] - } - - /** - * A Process runs the given operation steps in a context that provides the - * needed [[ActorRef]] of type `S` as the self-reference. Every process is - * allotted a maximum lifetime after which the entire Actor fails; you may - * set this to `Duration.Inf` for a server process. For non-fatal timeouts - * take a look at [[ScalaProcess#forAndCancel]]. - * - * The `name` of a Process is used as part of the process’ ActorRef name and - * must therefore adhere to the path segment grammar of the URI specification. - */ - final case class Process[S, +Out, E <: Effects]( - name: String, timeout: Duration, mailboxCapacity: Int, operation: Operation[S, Out, E]) { - if (name != "") a.ActorPath.validatePathElement(name) - - /** - * Execute the given computation and process step after having completed - * the current step. The current step’s computed value will be used as - * input for the next computation. - */ - def flatMap[T, EE <: Effects](f: Out ⇒ Operation[S, T, EE])(implicit p: E.ops.Prepend[E, EE]): Process[S, T, p.Out] = - copy(operation = FlatMap(operation, f)) - - /** - * Map the value computed by this process step by the given function, - * flattening the result if it is an [[Operation]] (by executing the - * operation and using its result as the mapped value). - * - * The reason behind flattening when possible is to allow the formulation - * of infinite process loops (as performed for example by server processes - * that respond to any number of requests) using for-comprehensions. - * Without this flattening a final pointless `map` step would be added - * for each iteration, eventually leading to an OutOfMemoryError. - */ - def map[T, Mapped, EOut <: Effects](f: Out ⇒ T)( - implicit ev: MapAdapter[S, T, Mapped, EOut], - p: E.ops.Prepend[E, EOut]): Process[S, Mapped, p.Out] = flatMap(ev.lift(f)) - - /** - * Only continue this process if the given predicate is fulfilled, terminate - * it otherwise. - */ - def filter(p: Out ⇒ Boolean): Process[S, Out, E] = flatMap(o ⇒ if (p(o)) Return(o) else ShortCircuit) - - /** - * Only continue this process if the given predicate is fulfilled, terminate - * it otherwise. - */ - def withFilter(p: Out ⇒ Boolean): Process[S, Out, E] = flatMap(o ⇒ if (p(o)) Return(o) else ShortCircuit) - - /** - * Create a copy with modified timeout parameter. - */ - def withTimeout(timeout: Duration): Process[S, Out, E] = copy(timeout = timeout) - - /** - * Create a copy with modified mailbox capacity. - */ - def withMailboxCapacity(mailboxCapacity: Int): Process[S, Out, E] = copy(mailboxCapacity = mailboxCapacity) - - /** - * Convert to a runnable [[Behavior]], e.g. for being used as the guardian of an [[ActorSystem]]. - */ - def toBehavior: Behavior[ActorCmd[S]] = ??? - } - - sealed trait Effect - sealed trait SessionEffect extends Effect - - object E { - sealed abstract class Read[-T] extends SessionEffect - sealed abstract class Send[+T] extends SessionEffect - sealed abstract class Fork[+E <: Effects] extends Effect - sealed abstract class Spawn[+E <: Effects] extends Effect - sealed abstract class Choice[+C <: :+:[_, _ <: :+:[_, _]]] extends SessionEffect - sealed abstract class Halt extends Effect - - object ops { - import language.higherKinds - - @implicitNotFound("Cannot prepend ${First} to ${Second} (e.g. due to infinite loop in the first argument)") - sealed trait Prepend[First <: Effects, Second <: Effects] { - type Out <: Effects - } - type PrependAux[F <: Effects, S <: Effects, O <: Effects] = Prepend[F, S] { type Out = O } - - sealed trait PrependLowLow { - implicit def prepend[H <: Effect, T <: Effects, S <: Effects]( - implicit ev: Prepend[T, S]): PrependAux[H :: T, S, H :: ev.Out] = null - } - sealed trait PrependLow extends PrependLowLow { - implicit def prependNil[F <: _0, S <: Effects]: PrependAux[F, S, S] = null - } - object Prepend extends PrependLow { - implicit def prependToNil[F <: Effects, S <: _0]: PrependAux[F, S, F] = null - } - - sealed trait Filter[E <: Effects, U] { - type Out <: Effects - } - type FilterAux[E <: Effects, U, O <: Effects] = Filter[E, U] { type Out = O } - - sealed trait FilterLow { - implicit def notFound[H <: Effect, T <: Effects, U](implicit f: Filter[T, U], ev: NoSub[H, U]): FilterAux[H :: T, U, f.Out] = null - implicit def loop[E <: Effects, U](implicit f: Filter[E, U]): FilterAux[Loop[E], U, Loop[f.Out]] = null - } - object Filter extends FilterLow { - implicit def nil[U]: FilterAux[_0, U, _0] = null - implicit def found[H <: Effect, T <: Effects, U >: H](implicit f: Filter[T, U]): FilterAux[H :: T, U, H :: f.Out] = null - } - - sealed trait NoSub[T, U] - implicit def noSub1[T, U]: NoSub[T, U] = null - implicit def noSub2[T <: U, U]: NoSub[T, U] = null - implicit def noSub3[T <: U, U]: NoSub[T, U] = null - } - } - - sealed trait Effects - sealed abstract class ::[+H <: Effect, +T <: Effects] extends Effects - sealed abstract class Loop[+E <: Effects] extends Effects - - sealed trait Choices - sealed abstract class :+:[+H <: Effects, +T <: Choices] extends Choices - - sealed trait Processes - final case class :|:[+H <: Process[_, _, _], +T <: Processes](h: H, t: T) extends Processes - - sealed abstract class _0 extends Effects with Choices with Processes - case object _0 extends _0 - - object Processes { - import language.higherKinds - - sealed trait lub[P <: Processes] { - type Out - } - type lubAux[P <: Processes, O] = lub[P] { type Out = O } - - object lub { - sealed trait LUB[T1, T2, Out] - implicit def LUB[L, T1 <: L, T2 <: L]: LUB[T1, T2, L] = null - - implicit def nil: lubAux[_0, Nothing] = null - implicit def cons[S, O, E <: Effects, T <: Processes, LT, L]( - implicit l: lubAux[T, LT], L: LUB[O, LT, L]): lubAux[Process[S, O, E] :|: T, L] = null - } - - trait ForkMapper[Val[_], Eff[_ <: Effects] <: Effect] { - def apply[S, O, E <: Effects](p: Process[S, O, E]) = ??? - } - - sealed trait fork[P <: Processes, TC[_ <: Effects] <: Effect] { - type Out <: Effects - } - type forkAux[P <: Processes, TC[_ <: Effects] <: Effect, O <: Effects] = fork[P, TC] { type Out = O } - - object fork { - implicit def nil[T[_ <: Effects] <: Effect]: forkAux[_0, T, _0] = null - implicit def cons[S, O, E <: Effects, TC[_ <: Effects] <: Effect, T <: Processes, TE <: Effects]( - implicit f: forkAux[T, TC, TE]): forkAux[Process[S, O, E] :|: T, TC, TC[E] :: TE] = null - } - } - - implicitly[Processes.lub.LUB[Int, Nothing, Int]] - implicitly[Processes.lubAux[_0, Nothing]] - implicitly[Processes.lubAux[Process[Any, Int, _0] :|: _0, Int]] - implicitly[Processes.lubAux[Process[Any, Long, _0] :|: Process[Any, Int, _0] :|: _0, AnyVal]] - - implicitly[Processes.forkAux[_0, E.Fork, _0]] - implicitly[Processes.forkAux[Process[String, String, _0] :|: _0, E.Fork, E.Fork[_0] :: _0]] - implicitly[Processes.forkAux[Process[Any, Any, E.Read[Int] :: _0] :|: Process[String, String, _0] :|: _0, E.Fork, E.Fork[E.Read[Int] :: _0] :: E.Fork[_0] :: _0]] - - private implicit class WithEffects[S, O](op: Operation[S, O, _]) { - def withEffects[E <: Effects]: Operation[S, O, E] = op.asInstanceOf[Operation[S, O, E]] - } - implicit class WithoutEffects[S, O](op: Operation[S, O, _]) { - def ignoreEffects: Operation[S, O, _0] = op.asInstanceOf[Operation[S, O, _0]] - } - - def opChoice[S, O, L <: Effects, R <: Effects]( - p: Boolean, l: ⇒ Operation[S, O, L], r: ⇒ Operation[S, O, R]): Operation[S, O, E.Choice[L :+: R :+: _0] :: _0] = - (if (p) l else r).withEffects[E.Choice[L :+: R :+: _0] :: _0] - - object EffectsTest { - import E.ops - type A = E.Read[Any] - type B = E.Send[Any] - type C = E.Fork[_0] - type D = E.Spawn[_0] - - //implicitly[Effects.NoSub[String, Any]] - //implicitly[Effects.NoSub[String, String]] - implicitly[ops.NoSub[String, Int]] - implicitly[ops.NoSub[Any, String]] - - implicitly[ops.PrependAux[_0, _0, _0]] - implicitly[ops.PrependAux[_0, A :: B :: _0, A :: B :: _0]] - implicitly[ops.PrependAux[A :: B :: _0, _0, A :: B :: _0]] - implicitly[ops.PrependAux[A :: B :: _0, C :: D :: _0, A :: B :: C :: D :: _0]] - - implicitly[ops.FilterAux[A :: B :: C :: D :: _0, SessionEffect, A :: B :: _0]] - implicitly[ops.FilterAux[Loop[_0], SessionEffect, Loop[_0]]] - implicitly[ops.FilterAux[A :: Loop[_0], SessionEffect, A :: Loop[_0]]] - implicitly[ops.FilterAux[A :: B :: C :: Loop[D :: A :: C :: B :: D :: _0], SessionEffect, A :: B :: Loop[A :: B :: _0]]] - - trait Protocol { - type Session <: Effects - } - object Protocol { - @implicitNotFound("The effects of ${E2} do not match the expected session type ${E1}") - sealed trait Eq[E1 <: Effects, E2 <: Effects] - implicit def eq[E1 <: Effects, E2 <: Effects](implicit ev: E1 =:= E2): Eq[E1, E2] = null - } - - object MyProto extends Protocol { - type Session = E.Read[String] :: E.Send[String] :: Loop[E.Read[String] :: _0] - } - - def vetProtocol[E <: Effects, F <: Effects](p: Protocol, op: Operation[_, _, E])( - implicit f: E.ops.FilterAux[E, SessionEffect, F], ev: Protocol.Eq[p.Session, F]): Unit = () - - val p = OpDSL[String] { implicit opDSL ⇒ - opProcessSelf - .flatMap(_ ⇒ opRead) - .flatMap(_ ⇒ - opSchedule(Duration.Zero, "", null) - .flatMap(_ ⇒ OpDSL.loopInf(_ ⇒ opRead)) - ) - } - - vetProtocol(MyProto, p) - - } - - /** - * An Operation is a step executed by a [[Process]]. It exists in a context - * characterized by the process’ ActorRef of type `S` and computes - * a value of type `Out` when executed. - * - * Operations are created by using the `op*` methods of [[ScalaProcess]] - * inside an [[OpDSL]] environment. - */ - sealed trait Operation[S, +Out, E <: Effects] { - /** - * Execute the given computation and process step after having completed - * the current step. The current step’s computed value will be used as - * input for the next computation. - */ - def flatMap[T, EE <: Effects](f: Out ⇒ Operation[S, T, EE])(implicit p: E.ops.Prepend[E, EE]): Operation[S, T, p.Out] = FlatMap(this, f) - - /** - * Map the value computed by this process step by the given function, - * flattening the result if it is an [[Operation]] (by executing the - * operation and using its result as the mapped value). - * - * The reason behind flattening when possible is to allow the formulation - * of infinite process loops (as performed for example by server processes - * that respond to any number of requests) using for-comprehensions. - * Without this flattening a final pointless `map` step would be added - * for each iteration, eventually leading to an OutOfMemoryError. - */ - def map[T, Mapped, EOut <: Effects](f: Out ⇒ T)( - implicit ev: MapAdapter[S, T, Mapped, EOut], - p: E.ops.Prepend[E, EOut]): Operation[S, Mapped, p.Out] = flatMap(ev.lift(f)) - - /** - * Only continue this process if the given predicate is fulfilled, terminate - * it otherwise. - */ - def filter(p: Out ⇒ Boolean): Operation[S, Out, E] = flatMap(o ⇒ if (p(o)) Return(o) else ShortCircuit) - - /** - * Only continue this process if the given predicate is fulfilled, terminate - * it otherwise. - */ - def withFilter(p: Out ⇒ Boolean): Operation[S, Out, E] = flatMap(o ⇒ if (p(o)) Return(o) else ShortCircuit) - - /** - * Wrap as a [[Process]] with infinite timeout and a mailbox capacity of 1. - * Small processes that are called or chained often interact in a fully - * sequential fashion, where these defaults make sense. - */ - def named(name: String): Process[S, Out, E] = Process(name, Duration.Inf, 1, this) - - /** - * Wrap as a [[Process]] with the given mailbox capacity and infinite timeout. - */ - def withMailboxCapacity(mailboxCapacity: Int): Process[S, Out, E] = named("").withMailboxCapacity(mailboxCapacity) - - /** - * Wrap as a [[Process]] with the given timeout and a mailbox capacity of 1. - */ - def withTimeout(timeout: Duration): Process[S, Out, E] = named("").withTimeout(timeout) - - /** - * Wrap as a [[Process]] but without a name and convert to a [[Behavior]]. - */ - def toBehavior: Behavior[ActorCmd[S]] = named("").toBehavior - - } - - /* - * These are the private values that make up the core algebra. - */ - - final case class FlatMap[S, Out1, Out2, E1 <: Effects, E2 <: Effects, E <: Effects]( - first: Operation[S, Out1, E1], then: Out1 ⇒ Operation[S, Out2, E2]) extends Operation[S, Out2, E] { - override def toString: String = s"FlatMap($first)" - } - case object ShortCircuit extends Operation[Nothing, Nothing, _0] { - override def flatMap[T, E <: Effects](f: Nothing ⇒ Operation[Nothing, T, E])( - implicit p: E.ops.Prepend[_0, E]): Operation[Nothing, T, p.Out] = this.asInstanceOf[Operation[Nothing, T, p.Out]] - } - - case object System extends Operation[Nothing, ActorSystem[Nothing], _0] - case object Read extends Operation[Nothing, Nothing, E.Read[Any] :: _0] - case object ProcessSelf extends Operation[Nothing, ActorRef[Any], _0] - case object ActorSelf extends Operation[Nothing, ActorRef[ActorCmd[Nothing]], _0] - final case class Return[T](value: T) extends Operation[Nothing, T, _0] - final case class Call[S, T, E <: Effects](process: Process[S, T, E], replacement: Option[T]) extends Operation[Nothing, T, E] - final case class Fork[S, E <: Effects](process: Process[S, Any, E]) extends Operation[Nothing, SubActor[S], E.Fork[E] :: _0] - final case class Spawn[S, E <: Effects](process: Process[S, Any, E], deployment: DeploymentConfig) extends Operation[Nothing, ActorRef[ActorCmd[S]], E.Spawn[E] :: _0] - final case class Schedule[T](delay: FiniteDuration, msg: T, target: ActorRef[T]) extends Operation[Nothing, a.Cancellable, E.Send[T] :: _0] - sealed trait AbstractWatchRef { type Msg } - final case class WatchRef[T](watchee: ActorRef[Nothing], msg: T, target: ActorRef[T], onFailure: Throwable ⇒ Option[T]) - extends Operation[Nothing, a.Cancellable, _0] with AbstractWatchRef { - type Msg = T - override def equals(other: Any) = super.equals(other) - override def hashCode() = super.hashCode() - } - //final case class Replay[T](key: StateKey[T]) extends Operation[Nothing, T] - //final case class Snapshot[T](key: StateKey[T]) extends Operation[Nothing, T] - final case class State[S, T <: StateKey[S], Ev, Ex](key: T { type Event = Ev }, afterUpdates: Boolean, transform: S ⇒ (Seq[Ev], Ex)) extends Operation[Nothing, Ex, _0] - final case class StateR[S, T <: StateKey[S], Ev](key: T { type Event = Ev }, afterUpdates: Boolean, transform: S ⇒ Seq[Ev]) extends Operation[Nothing, S, _0] - final case class Forget[T](key: StateKey[T]) extends Operation[Nothing, akka.Done, _0] - final case class Cleanup(cleanup: () ⇒ Unit) extends Operation[Nothing, akka.Done, _0] - - /* - * The core operations: keep these minimal! - */ - - /** - * Obtain a reference to the ActorSystem in which this process is running. - */ - def opSystem(implicit opDSL: OpDSL): Operation[opDSL.Self, ActorSystem[Nothing], _0] = System - - /** - * Read a message from this process’ input channel. - */ - def opRead(implicit opDSL: OpDSL): Operation[opDSL.Self, opDSL.Self, E.Read[opDSL.Self] :: _0] = Read - - /** - * Obtain this process’ [[ActorRef]], not to be confused with the ActorRef of the Actor this process is running in. - */ - def opProcessSelf(implicit opDSL: OpDSL): Operation[opDSL.Self, ActorRef[opDSL.Self], _0] = ProcessSelf - - /** - * Obtain the [[ActorRef]] of the Actor this process is running in. - */ - def opActorSelf(implicit opDSL: OpDSL): Operation[opDSL.Self, ActorRef[ActorCmd[Nothing]], _0] = ActorSelf - - /** - * Lift a plain value into a process that returns that value. - */ - def opUnit[U](value: U)(implicit opDSL: OpDSL): Operation[opDSL.Self, U, _0] = Return(value) - - /** - * Execute the given process within the current Actor, await and return that process’ result. - * If the process does not return a result (due to a non-matching `filter` expression), the - * replacement value is used if the provided Option contains a value. - */ - def opCall[Self, Out, E <: Effects](process: Process[Self, Out, E], replacement: Option[Out] = None)( - implicit opDSL: OpDSL): Operation[opDSL.Self, Out, E] = - Call(process, replacement) - - /** - * Create and execute a process with a self reference of the given type, - * await and return that process’ result. This is equivalent to creating - * a process with [[OpDSL]] and using `call` to execute it. A replacement - * value is not provided; if recovery from a halted subprocess is desired - * please use `opCall` directly. - */ - def opNextStep[T]: OpDSL.NextStep[T] = - OpDSL.nextStep.asInstanceOf[OpDSL.NextStep[T]] - - /** - * Execute the given process within the current Actor, concurrently with the - * current process. The value computed by the forked process cannot be - * observed, instead you would have the forked process send a message to the - * current process to communicate results. The returned [[SubActor]] reference - * can be used to send messages to the forked process or to cancel it. - */ - def opFork[Self, E <: Effects](process: Process[Self, Any, E])(implicit opDSL: OpDSL): Operation[opDSL.Self, SubActor[Self], E.Fork[E] :: _0] = - Fork(process) - - /** - * Execute the given process in a newly spawned child Actor of the current - * Actor. The new Actor is fully encapsulated behind the [[ActorRef]] that - * is returned. - * - * The mailboxCapacity for the Actor is configured using the optional - * [[DeploymentConfig]] while the initial process’ process mailbox is - * limited based on the [[Process]] configuration as usual. When sizing - * the Actor mailbox capacity you need to consider that communication - * between the processes hosted by that Actor and timeouts also go through - * this mailbox. - */ - def opSpawn[Self, E <: Effects](process: Process[Self, Any, E], deployment: DeploymentConfig = EmptyDeploymentConfig)( - implicit opDSL: OpDSL): Operation[opDSL.Self, ActorRef[ActorCmd[Self]], E.Spawn[E] :: _0] = - Spawn(process, deployment) - - /** - * Schedule a message to be sent after the given delay has elapsed. - */ - def opSchedule[T](delay: FiniteDuration, msg: T, target: ActorRef[T])(implicit opDSL: OpDSL): Operation[opDSL.Self, a.Cancellable, E.Send[T] :: _0] = - Schedule(delay, msg, target) - - /** - * Watch the given [[ActorRef]] and send the specified message to the given - * target when the watched actor has terminated. The returned Cancellable - * can be used to unwatch the watchee, which will inhibit the message from - * being dispatched—it might still be delivered if it was previously dispatched. - * - * If `onFailure` is provided it can override the value to be sent if the - * watched Actor failed and was a child Actor of the Actor hosting this process. - */ - def opWatch[T](watchee: ActorRef[Nothing], msg: T, target: ActorRef[T], onFailure: Throwable ⇒ Option[T] = any2none)( - implicit opDSL: OpDSL): Operation[opDSL.Self, a.Cancellable, _0] = - WatchRef(watchee, msg, target, onFailure) - - val any2none = (_: Any) ⇒ None - private val _any2Nil = (state: Any) ⇒ Nil → state - private def any2Nil[T] = _any2Nil.asInstanceOf[T ⇒ (Nil.type, T)] - - /** - * Read the state stored for the given [[StateKey]], suspending this process - * until after all outstanding updates for the key have been completed if - * `afterUpdates` is `true`. - */ - def opReadState[T](key: StateKey[T], afterUpdates: Boolean = true)(implicit opDSL: OpDSL): Operation[opDSL.Self, T, _0] = - State[T, StateKey[T], key.Event, T](key, afterUpdates, any2Nil) - - /** - * Update the state stored for the given [[StateKey]] by emitting events that - * are applied to the state in order, suspending this process - * until after all outstanding updates for the key have been completed if - * `afterUpdates` is `true`. The return value is determined by the transform - * function based on the current state; if you want to return the state that - * results from having applied the emitted events then please see - * [[ScalaProcess#opUpdateAndReadState]]. - */ - def opUpdateState[T, Ev, Ex](key: StateKey[T] { type Event = Ev }, afterUpdates: Boolean = true)( - transform: T ⇒ (Seq[Ev], Ex))(implicit opDSL: OpDSL): Operation[opDSL.Self, Ex, _0] = - State(key, afterUpdates, transform) - - /** - * Update the state by emitting a sequence of events, returning the updated state. The - * process is suspended until after all outstanding updates for the key have been - * completed if `afterUpdates` is `true`. - */ - def opUpdateAndReadState[T, Ev](key: StateKey[T] { type Event = Ev }, afterUpdates: Boolean = true)( - transform: T ⇒ Seq[Ev])(implicit opDSL: OpDSL): Operation[opDSL.Self, T, _0] = - StateR(key, afterUpdates, transform) - - /** - * FIXME not yet implemented - * - * Instruct the Actor to persist the state for the given [[StateKey]] after - * all currently outstanding updates for this key have been completed, - * suspending this process until done. - */ - //def opTakeSnapshot[T](key: PersistentStateKey[T])(implicit opDSL: OpDSL): Operation[opDSL.Self, T] = - // Snapshot(key) - - /** - * FIXME not yet implemented - * - * Restore the state for the given [[StateKey]] from persistent event storage. - * If a snapshot is found it will be used as the starting point for the replay, - * otherwise events are replayed from the beginning of the event log, starting - * with the given initial data as the state before the first event is applied. - */ - //def opReplayPersistentState[T](key: PersistentStateKey[T])(implicit opDSL: OpDSL): Operation[opDSL.Self, T] = - // Replay(key) - - /** - * Remove the given [[StateKey]] from this Actor’s storage. The slot can be - * filled again using `updateState` or `replayPersistentState`. - */ - def opForgetState[T](key: StateKey[T])(implicit opDSL: OpDSL): Operation[opDSL.Self, akka.Done, _0] = - Forget(key) - - /** - * Run the given cleanup handler after the operations that will be chained - * off of this one, i.e. this operation must be further transformed to make - * sense. - * - * Usage with explicit combinators: - * {{{ - * opCleanup(() => doCleanup()) - * .flatMap { _ => - * ... - * } // doCleanup() will run here - * .flatMap { ... } - * }}} - * - * Usage with for-expressions: - * {{{ - * (for { - * resource <- obtainResource - * _ <- opCleanup(() => doCleanup(resource)) - * ... - * } yield ... - * ) // doCleanup() will run here - * .flatMap { ... } - * }}} - * - * Unorthodox usage: - * {{{ - * (for { - * resource <- obtainResource - * ... - * } yield opCleanup(() => doCleanup(resource)) - * ) // doCleanup() will run here - * .flatMap { ... } - * }}} - */ - def opCleanup(cleanup: () ⇒ Unit)(implicit opDSL: OpDSL): Operation[opDSL.Self, akka.Done, _0] = - Cleanup(cleanup) - - /** - * Terminate processing here, ignoring further transformations. If this process - * has been called by another process then the `replacement` argument to `opCall` - * determines whether the calling process continues or halts as well: if no - * replacement is given, processing cannot go on. - */ - def opHalt(implicit opDSL: OpDSL): Operation[opDSL.Self, Nothing, _0] = ShortCircuit - - // FIXME opChildList - // FIXME opProcessList - // FIXME opTerminate - // FIXME opStopChild - // FIXME opAsk(Main) - // FIXME opParallel - // FIXME opUpdate(Read)SimpleState - - /* - * State Management - */ - - /** - * A key into the Actor’s state map that allows access both for read and - * update operations. Updates are modeled by emitting events of the specified - * type. The updates are applied to the state in the order in which they are - * emitted. For persistent state data please refer to [[PersistentStateKey]] - * and for ephemeral non-event-sourced data take a look at [[SimpleStateKey]]. - */ - trait StateKey[T] { - type Event - def apply(state: T, event: Event): T - def initial: T - } - - /** - * Event type emitted in conjunction with [[SimpleStateKey]], the only - * implementation is [[SetState]]. - */ - sealed trait SetStateEvent[T] { - def value: T - } - /** - * Event type that instructs the state of a [[SimpleStateKey]] to be - * replaced with the given value. - */ - final case class SetState[T](override val value: T) extends SetStateEvent[T] with Seq[SetStateEvent[T]] { - def iterator: Iterator[SetStateEvent[T]] = Iterator.single(this) - def apply(idx: Int): SetStateEvent[T] = - if (idx == 0) this - else throw new IndexOutOfBoundsException(s"$idx (for single-element sequence)") - def length: Int = 1 - } - - /** - * Use this key for state that shall neither be persistent nor event-sourced. - * In effect this turns `updateState` into access to a State monad identified - * by this key instance. - * - * Beware that reference equality is used to identify this key: you should - * create the key as a `val` inside a top-level `object`. - */ - final class SimpleStateKey[T](override val initial: T) extends StateKey[T] { - type Event = SetStateEvent[T] - def apply(state: T, event: SetStateEvent[T]) = event.value - override def toString: String = f"SimpleStateKey@$hashCode%08X($initial)" - } - - /** - * The data for a [[StateKey]] of this kind can be marked as persistent by - * invoking `replayPersistentState`—this will first replay the stored events - * and subsequently commit all emitted events to the journal before applying - * them to the state. - * - * FIXME persistence is not yet implemented - */ - //trait PersistentStateKey[T] extends StateKey[T] { - // def clazz: Class[Event] - //} - - /* - * Derived operations - */ - - /** - * Fork the given processes the return the first value emitted by any one of - * them. As soon as one process has yielded its value all others are canceled. - */ - def firstOf[P <: Processes, T](processes: P)(implicit l: Processes.lubAux[P, T], f: Processes.fork[P, E.Fork]): Operation[T, T, f.Out] = { - def forkAll[P0 <: Processes](self: ActorRef[T], index: Int = 0, p: P0 = processes, acc: List[SubActor[Nothing]] = Nil)( - implicit opDSL: OpDSL { type Self = T }, f: Processes.fork[P0, E.Fork]): Operation[T, List[SubActor[Nothing]], f.Out] = - p match { - case _0 ⇒ opUnit(acc) - case x :|: xs ⇒ - opFork(x.copy(name = s"$index-${x.name}").map(self ! _)) - .flatMap(sub ⇒ forkAll(self, index + 1, xs, sub :: acc)) - } - OpDSL[T] { implicit opDSL ⇒ - for { - self ← opProcessSelf - subs ← forkAll(self) - value ← opRead - } yield { - subs.foreach(_.cancel()) - value - } - } - } - - /** - * Suspend the process for the given time interval and deliver the specified - * value afterwards. This is especially useful as a timeout value for `firstOf`. - */ - def delay[T](time: FiniteDuration, value: T): Operation[T, T, _0] = - OpDSL[T] { implicit opDSL ⇒ - for { - self ← opProcessSelf - _ ← opSchedule(time, value, self) - } yield opRead - }.ignoreEffects - - /** - * Fork the given process, but also fork another process that will cancel the - * first process after the given timeout. - */ - def forkAndCancel[T, E <: Effects](timeout: FiniteDuration, process: Process[T, Any, E])( - implicit opDSL: OpDSL): Operation[opDSL.Self, SubActor[T], E.Fork[E] :: E.Fork[E.Send[Boolean] :: E.Read[Boolean] :: _0] :: _0] = { - def guard(sub: SubActor[T]) = OpDSL[Boolean] { implicit opDSL ⇒ - for { - self ← opProcessSelf - _ ← opWatch(sub.ref, false, self) - _ ← opSchedule(timeout, true, self) - cancel ← opRead - if cancel - } yield sub.cancel() - } - - for { - sub ← opFork(process) - _ ← opFork(guard(sub).named("cancelAfter")) - } yield sub - } - - /** - * Retry the given process the specified number of times, always bounding - * the wait time by the given timeout and canceling the fruitless process. - * If the number of retries is exhausted, the entire Actor will be failed. - */ - // FIXME figure out effects - // def retry[S, T](timeout: FiniteDuration, retries: Int, ops: Process[S, T])(implicit opDSL: OpDSL): Operation[opDSL.Self, T] = { - // opCall(firstOf(ops.map(Some(_)), delay(timeout, None).named("retryTimeout")).named("firstOf")) - // .map { - // case Some(res) ⇒ opUnit(res) - // case None if retries > 0 ⇒ retry(timeout, retries - 1, ops) - // case None ⇒ throw new RetriesExceeded(s"process ${ops.name} has been retried $retries times with timeout $timeout") - // } - // } - - /** - * The main ActorRef of an Actor hosting [[Process]] instances accepts this - * type of messages. The “main process” is the one with which the Actor is - * spawned and which may fork or call other processes. Its input of type `T` - * can be reached using [[MainCmd]] messages. Other subtypes are used for - * internal purposes. - */ - sealed trait ActorCmd[+T] - /** - * Send a message to the “main process” of an Actor hosting processes. Note - * that this message is routed via the host Actor’s behavior and then through - * the [[Process]] mailbox of the main process. - */ - case class MainCmd[+T](cmd: T) extends ActorCmd[T] - trait InternalActorCmd[+T] extends ActorCmd[T] - - /** - * Forking a process creates a sub-Actor within the current actor that is - * executed concurrently. This sub-Actor [[Process]] has its own [[ActorRef]] - * and it can be canceled. - */ - trait SubActor[-T] { - def ref: ActorRef[T] - def cancel(): Unit - } -} diff --git a/src/main/scala/com/rolandkuhn/akka_typed_session/ScalaProcess2.scala b/src/main/scala/com/rolandkuhn/akka_typed_session/ScalaDSL.scala similarity index 94% rename from src/main/scala/com/rolandkuhn/akka_typed_session/ScalaProcess2.scala rename to src/main/scala/com/rolandkuhn/akka_typed_session/ScalaDSL.scala index ff4a4a8..47e3257 100644 --- a/src/main/scala/com/rolandkuhn/akka_typed_session/ScalaProcess2.scala +++ b/src/main/scala/com/rolandkuhn/akka_typed_session/ScalaDSL.scala @@ -11,6 +11,7 @@ import scala.annotation.implicitNotFound import shapeless._ import shapeless.ops._ import shapeless.test.illTyped +import akka.typed.scaladsl.Actor /** * A DSL for writing reusable behavior pieces that are executed concurrently @@ -31,7 +32,7 @@ import shapeless.test.illTyped * - persistence can be plugged in transparently (NOT YET IMPLEMENTED) * - recovery means acquiring state initially (which might trigger internal replay) */ -object ScalaProcess2 { +object ScalaDSL { /** * Exception type that is thrown by the `retry` facility when after the @@ -236,7 +237,7 @@ object ScalaProcess2 { /** * Convert to a runnable [[Behavior]], e.g. for being used as the guardian of an [[ActorSystem]]. */ - def toBehavior: Behavior[ActorCmd[S]] = ??? + def toBehavior: Behavior[ActorCmd[S]] = Actor.deferred(ctx => new internal.ProcessInterpreter(this, ctx).execute(ctx)) } sealed trait Effect @@ -379,6 +380,7 @@ object ScalaProcess2 { * inside an [[OpDSL]] environment. */ sealed trait Operation[S, +Out, E <: Effects] { + /** * Execute the given computation and process step after having completed * the current step. The current step’s computed value will be used as @@ -451,7 +453,7 @@ object ScalaProcess2 { object Impl { final case class FlatMap[S, Out1, Out2, E1 <: Effects, E2 <: Effects, E <: Effects]( - first: Operation[S, Out1, E1], then: Out1 ⇒ Operation[S, Out2, E2]) extends Operation[S, Out2, E] { + first: Operation[S, Out1, E1], andThen: Out1 ⇒ Operation[S, Out2, E2]) extends Operation[S, Out2, E] { override def toString: String = s"FlatMap($first)" } case object ShortCircuit extends Operation[Nothing, Nothing, E.Halt :: _0] { @@ -463,14 +465,14 @@ object ScalaProcess2 { case object Read extends Operation[Nothing, Nothing, E.Read[Any] :: _0] case object ProcessSelf extends Operation[Nothing, ActorRef[Any], _0] case object ActorSelf extends Operation[Nothing, ActorRef[ActorCmd[Nothing]], _0] - final case class Choice[S, T, C <: Coproduct, E <: Coproduct](ops: C)(implicit val u: coproduct.Unifier[C]) extends Operation[S, T, E.Choice[E] :: _0] + final case class Choice[S, T, E <: Coproduct](ops: Operation[S, T, _0]) extends Operation[S, T, E.Choice[E] :: _0] final case class Return[T](value: T) extends Operation[Nothing, T, _0] final case class Call[S, T, E <: Effects](process: Process[S, T, E], replacement: Option[T]) extends Operation[Nothing, T, E] final case class Fork[S, E <: Effects](process: Process[S, Any, E]) extends Operation[Nothing, SubActor[S], E.Fork[E] :: _0] - final case class Spawn[S, E <: Effects](process: Process[S, Any, E], deployment: DeploymentConfig) extends Operation[Nothing, ActorRef[ActorCmd[S]], E.Spawn[E] :: _0] + final case class Spawn[S, E <: Effects](process: Process[S, Any, E], deployment: Props) extends Operation[Nothing, ActorRef[ActorCmd[S]], E.Spawn[E] :: _0] final case class Schedule[T](delay: FiniteDuration, msg: T, target: ActorRef[T]) extends Operation[Nothing, a.Cancellable, E.Send[T] :: _0] sealed trait AbstractWatchRef { type Msg } - final case class WatchRef[T](watchee: ActorRef[Nothing], msg: T, target: ActorRef[T], onFailure: Throwable ⇒ Option[T]) + final case class WatchRef[T](watchee: ActorRef[Nothing], target: ActorRef[T], msg: T, onFailure: Throwable ⇒ Option[T]) extends Operation[Nothing, a.Cancellable, _0] with AbstractWatchRef { type Msg = T override def equals(other: Any) = super.equals(other) @@ -531,23 +533,29 @@ object ScalaProcess2 { * CNil] :: _0] * }}} */ - def opChoice[S, O, E <: Effects](p: Boolean, op: ⇒ Operation[S, O, E]): OpChoice[Operation[S, O, E], CNil, E :+: CNil, Any] = - if (p) new OpChoice(Some(Coproduct(op))) else new OpChoice(None) + def opChoice[S, O, E <: Effects](p: Boolean, op: ⇒ Operation[S, O, E]): OpChoice[S, Operation[S, O, _0], CNil, E :+: CNil, Operation[S, O, _0], O] = + if (p) new OpChoice(Some(Coproduct(op.ignoreEffects))) + else new OpChoice(None) + + class OpChoice[S, H <: Operation[S, _, _], T <: Coproduct, E0 <: Coproduct, +O <: Operation[S, Output, _], Output](ops: Option[H :+: T])( + implicit u: coproduct.Unifier.Aux[H :+: T, O]) { - class OpChoice[H, T <: Coproduct, E0 <: Coproduct, +O](ops: Option[H :+: T])(implicit u: coproduct.Unifier.Aux[H :+: T, O]) { - def elseIf[S, O, E <: Effects](p: Boolean, op: => Operation[S, O, E]): OpChoice[Operation[S, O, E], H :+: T, E :+: E0, Any] = { + def elseIf[O1 >: Output, E1 <: Effects](p: => Boolean, op: => Operation[S, O1, E1])( + implicit u1: coproduct.Unifier.Aux[Operation[S, O1, _0] :+: H :+: T, Operation[S, O1, _0]]): OpChoice[S, Operation[S, O1, _0], H :+: T, E1 :+: E0, Operation[S, O1, _0], O1] = { val ret = ops match { - case Some(c) => Some(c.extendLeft[Operation[S, O, E]]) - case None => if (p) Some(Coproduct[Operation[S, O, E] :+: H :+: T](op)) else None + case Some(c) => Some(c.extendLeft[Operation[S, O1, _0]]) + case None => if (p) Some(Coproduct[Operation[S, O1, _0] :+: H :+: T](op.ignoreEffects)) else None } new OpChoice(ret) } - def orElse[S, O, E <: Effects](op: => Operation[S, O, E]): Operation[S, O, E.Choice[E :+: E0] :: _0] = { + + def orElse[O1 >: Output, E1 <: Effects](op: => Operation[S, O1, E1])( + implicit u1: coproduct.Unifier.Aux[Operation[S, O1, _0] :+: H :+: T, Operation[S, O1, _0]]): Operation[S, O1, E.Choice[E1 :+: E0] :: _0] = { val ret = ops match { - case Some(c) => c.extendLeft[Operation[S, O, E]] - case None => Coproduct[Operation[S, O, E] :+: H :+: T](op) + case Some(c) => c.extendLeft[Operation[S, O1, _0]] + case None => Coproduct[Operation[S, O1, _0] :+: H :+: T](op.ignoreEffects) } - Impl.Choice[S, O, Operation[S, O, E] :+: H :+: T, E :+: E0](ret) + Impl.Choice[S, O1, E1 :+: E0](u1(ret)) } } @@ -592,7 +600,7 @@ object ScalaProcess2 { * between the processes hosted by that Actor and timeouts also go through * this mailbox. */ - def opSpawn[Self, E <: Effects](process: Process[Self, Any, E], deployment: DeploymentConfig = EmptyDeploymentConfig)( + def opSpawn[Self, E <: Effects](process: Process[Self, Any, E], deployment: Props = Props.empty)( implicit opDSL: OpDSL): Operation[opDSL.Self, ActorRef[ActorCmd[Self]], E.Spawn[E] :: _0] = Impl.Spawn(process, deployment) @@ -613,7 +621,7 @@ object ScalaProcess2 { */ def opWatch[T](watchee: ActorRef[Nothing], target: ActorRef[T], msg: T, onFailure: Throwable ⇒ Option[T] = any2none)( implicit opDSL: OpDSL): Operation[opDSL.Self, a.Cancellable, _0] = - Impl.WatchRef(watchee, msg, target, onFailure) + Impl.WatchRef(watchee, target, msg, onFailure) val any2none = (_: Any) ⇒ None private val _any2Nil = (state: Any) ⇒ Nil → state diff --git a/src/main/scala/com/rolandkuhn/akka_typed_session/ProcessImpl.scala b/src/main/scala/com/rolandkuhn/akka_typed_session/internal/ProcessImpl.scala similarity index 83% rename from src/main/scala/com/rolandkuhn/akka_typed_session/ProcessImpl.scala rename to src/main/scala/com/rolandkuhn/akka_typed_session/internal/ProcessImpl.scala index e4acfd2..16f9977 100644 --- a/src/main/scala/com/rolandkuhn/akka_typed_session/ProcessImpl.scala +++ b/src/main/scala/com/rolandkuhn/akka_typed_session/internal/ProcessImpl.scala @@ -1,12 +1,13 @@ /** * Copyright (C) 2016 Lightbend Inc. */ -package akka.typed +package com.rolandkuhn.akka_typed_session package internal import akka.{ actor ⇒ a } import java.util.concurrent.ArrayBlockingQueue -import ScalaProcess._ +import akka.typed._ +import akka.typed.scaladsl.Actor import ScalaDSL._ import scala.concurrent.duration._ import scala.annotation.tailrec @@ -41,7 +42,7 @@ import akka.event.Logging * TODO: * enable noticing when watchee failed */ -private[typed] object ProcessInterpreter { +private[akka_typed_session] object ProcessInterpreter { sealed trait TraversalState case object HasValue extends TraversalState @@ -75,77 +76,81 @@ private[typed] object ProcessInterpreter { override def isCancelled: Boolean = true } - val wrapReturn = (o: Any) ⇒ Return(o).asInstanceOf[Operation[Any, Any]] + val wrapReturn = (o: Any) ⇒ Impl.Return(o).asInstanceOf[Operation[Any, Any, _0]] case class Timeout(deadline: Deadline) extends InternalActorCmd[Nothing] } -private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends Behavior[ActorCmd[T]] { +/** + * Important: must call .execute(ctx) upon creation and return its result as the + * next behavior! + */ +private[akka_typed_session] class ProcessInterpreter[T]( + initial: ⇒ Process[T, Any, _], + ctx: scaladsl.ActorContext[ActorCmd[T]]) extends ExtensibleBehavior[ActorCmd[T]] { import ProcessInterpreter._ // FIXME data structures to be optimized private var internalTriggers = Map.empty[Traversal[_], Traversal[_]] private val queue = new LinkedList[Traversal[_]] private var processRoots = Set.empty[Traversal[_]] - private var mainProcess: Traversal[T] = _ private var timeouts = emptyTimeouts private var timeoutTask = notScheduled - private var watchMap = Map.empty[ActorRef[Nothing], Set[AbstractWatchRef]] + private var watchMap = Map.empty[ActorRef[Nothing], Set[Impl.AbstractWatchRef]] private var stateMap = Map.empty[StateKey[_], Any] + private val mainProcess: Traversal[T] = new Traversal(initial, ctx) + + if (mainProcess.state == HasValue) triggerCompletions(ctx, mainProcess) + else processRoots += mainProcess - def management(ctx: ActorContext[ActorCmd[T]], msg: Signal): Behavior[ActorCmd[T]] = { + def receiveSignal(ctx: ActorContext[ActorCmd[T]], msg: Signal): Behavior[ActorCmd[T]] = { msg match { - case PreStart ⇒ - mainProcess = new Traversal(initial, ctx) - if (mainProcess.state == HasValue) triggerCompletions(ctx, mainProcess) - else processRoots += mainProcess - execute(ctx) case PostStop ⇒ processRoots.foreach(_.cancel()) - Same + Actor.same case t @ Terminated(ref) ⇒ watchMap.get(ref) match { - case None ⇒ Unhandled + case None ⇒ Actor.unhandled case Some(set) ⇒ - if (t.failure == null) set.foreach { case w: WatchRef[tpe] ⇒ w.target ! w.msg } - else set.foreach { case w: WatchRef[tpe] ⇒ w.target ! w.onFailure(t.failure).getOrElse(w.msg) } + if (t.failure == null) set.foreach { case w: Impl.WatchRef[tpe] ⇒ w.target ! w.msg } + else set.foreach { case w: Impl.WatchRef[tpe] ⇒ w.target ! w.onFailure(t.failure).getOrElse(w.msg) } watchMap -= ref - Same + Actor.same } - case _ ⇒ Same + case _ ⇒ Actor.same } } - def message(ctx: ActorContext[ActorCmd[T]], msg: ActorCmd[T]): Behavior[ActorCmd[T]] = { + def receiveMessage(ctx: ActorContext[ActorCmd[T]], msg: ActorCmd[T]): Behavior[ActorCmd[T]] = { // for paranoia: if Timeout message is lost due to bounded mailbox (costs 50ns if nonEmpty) if (timeouts.nonEmpty && Deadline.now.time.toNanos - timeouts.head.time.toNanos >= 0) throw new TimeoutException("process timeout expired") msg match { case t: Traversal[_] ⇒ - if (Debug) println(s"${ctx.self} got message for $t") + if (Debug) println(s"${ctx.asScala.self} got message for $t") if (t.isAlive) { t.registerReceipt() if (t.state == NeedsExternalInput) { t.dispatchInput(t.receiveOne(), t) - triggerCompletions(ctx, t) + triggerCompletions(ctx.asScala, t) } } - execute(ctx) + execute(ctx.asScala) case Timeout(_) ⇒ // won’t get here anyway due to the clock check above, but is included for documentation - Same + Actor.same case MainCmd(cmd) ⇒ mainProcess.ref ! cmd - Same - case _ ⇒ Unhandled + Actor.same + case _ ⇒ Actor.unhandled } } /** * Consume the queue of outstanding triggers. */ - private def execute(ctx: ActorContext[ActorCmd[T]]): Behavior[ActorCmd[T]] = { + def execute(ctx: scaladsl.ActorContext[ActorCmd[T]]): Behavior[ActorCmd[T]] = { while (!queue.isEmpty()) { val traversal = queue.poll() if (traversal.state == NeedsTrampoline) traversal.dispatchTrampoline() @@ -156,14 +161,14 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends val refs = ctx.children.map(_.path.name) println(s"${ctx.self} execute run finished, roots = $roots, children = $refs, timeouts = $timeouts, watchMap = $watchMap") } - if (processRoots.isEmpty) Stopped else Same + if (processRoots.isEmpty) Actor.stopped else this } /** * This only notifies potential listeners of the computed value of a finished * process; the process must clean itself up beforehand. */ - @tailrec private def triggerCompletions(ctx: ActorContext[ActorCmd[T]], traversal: Traversal[_]): Unit = + @tailrec private def triggerCompletions(ctx: scaladsl.ActorContext[ActorCmd[T]], traversal: Traversal[_]): Unit = if (traversal.state == HasValue) { if (Debug) println(s"${ctx.self} finished $traversal") internalTriggers.get(traversal) match { @@ -175,7 +180,7 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends } } - def addTimeout(ctx: ActorContext[ActorCmd[T]], f: FiniteDuration): Deadline = { + def addTimeout(ctx: scaladsl.ActorContext[ActorCmd[T]], f: FiniteDuration): Deadline = { var d = Deadline.now + f while (timeouts contains d) d += 1.nanosecond if (Debug) println(s"${ctx.self} adding $d") @@ -184,7 +189,7 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends d } - def removeTimeout(ctx: ActorContext[ActorCmd[T]], d: Deadline): Unit = { + def removeTimeout(ctx: scaladsl.ActorContext[ActorCmd[T]], d: Deadline): Unit = { if (Debug) println(s"${ctx.self} removing $d") timeouts -= d if (timeouts.isEmpty) { @@ -196,17 +201,17 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends } } - def scheduleTimeout(ctx: ActorContext[ActorCmd[T]], d: Deadline): Unit = { + def scheduleTimeout(ctx: scaladsl.ActorContext[ActorCmd[T]], d: Deadline): Unit = { if (Debug) println(s"${ctx.self} scheduling $d") timeoutTask.cancel() timeoutTask = ctx.schedule(d.timeLeft, ctx.self, Timeout(d)) } - def watch(ctx: ActorContext[ActorCmd[T]], w: WatchRef[_]): Cancellable = { + def watch(ctx: scaladsl.ActorContext[ActorCmd[T]], w: Impl.WatchRef[_]): Cancellable = { val watchee = w.watchee - val set: Set[AbstractWatchRef] = watchMap.get(watchee) match { + val set: Set[Impl.AbstractWatchRef] = watchMap.get(watchee) match { case None ⇒ - ctx.watch[Nothing](watchee) + ctx.watch(watchee) Set(w) case Some(s) ⇒ s + w } @@ -220,7 +225,7 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends val next = s - w if (next.isEmpty) { watchMap -= watchee - ctx.unwatch[Nothing](watchee) + ctx.unwatch(watchee) } else { watchMap = watchMap.updated(watchee, next) } @@ -245,9 +250,9 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends stateMap = stateMap.updated(key, value) } - private class Traversal[Tself](val process: Process[Tself, Any], ctx: ActorContext[ActorCmd[T]]) - extends InternalActorCmd[Nothing] with Function1[Tself, ActorCmd[T]] - with SubActor[Tself] { + private class Traversal[Tself](val process: Process[Tself, Any, _], ctx: scaladsl.ActorContext[ActorCmd[T]]) + extends InternalActorCmd[Nothing] with Function1[Tself, ActorCmd[T]] + with SubActor[Tself] { val deadline = process.timeout match { case f: FiniteDuration ⇒ addTimeout(ctx, f) @@ -291,20 +296,23 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends override def toString: String = if (Debug) { val stackList = stack.toList.map { - case null ⇒ "" - case t: Traversal[_] ⇒ "Traversal" - case FlatMap(_, _) ⇒ "FlatMap" - case other ⇒ other.toString + case null ⇒ "" + case t: Traversal[_] ⇒ "Traversal" + case Impl.FlatMap(_, _) ⇒ "FlatMap" + case other ⇒ other.toString } s"Traversal(${ref.path.name}, ${process.name}, $state, $stackList, $ptr)" } else super.toString - @tailrec private def depth(op: Operation[_, Any], d: Int = 0): Int = + @tailrec private def depth(op: Operation[_, Any, _], d: Int = 0): Int = { + import Impl._ op match { case FlatMap(next, _) ⇒ depth(next, d + 1) + case Choice(ops) => depth(ops, d) case Read | Call(_, _) | Cleanup(_) ⇒ d + 2 case _ ⇒ d + 1 } + } /* * The state defines what is on the stack: @@ -365,12 +373,16 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends */ def state: TraversalState = _state - private def initialize(node: Operation[_, Any]): TraversalState = { - @tailrec def rec(node: Operation[_, Any]): TraversalState = + private def initialize(node: Operation[_, Any, _]): TraversalState = { + import Impl._ + + @tailrec def rec(node: Operation[_, Any, _]): TraversalState = node match { case FlatMap(first, _) ⇒ push(node) rec(first) + case Choice(ops) => + rec(ops) case ShortCircuit ⇒ push(NoValue) valueOrTrampoline() @@ -451,7 +463,7 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends * not empty right now */ ensureSpace(depth(node) + 1) - rec(FlatMap(node.asInstanceOf[Operation[Any, Any]], wrapReturn)) + rec(FlatMap(node.asInstanceOf[Operation[Any, Any, Effects]], wrapReturn)) case _ ⇒ ensureSpace(depth(node)) rec(node) @@ -463,13 +475,13 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends _state match { case NeedsInternalInput ⇒ assert(source eq pop()) - val Call(proc, replacement) = pop() + val Impl.Call(proc, replacement) = pop() assert(source.process eq proc) if (value != NoValue) push(value) else push(replacement.getOrElse(NoValue)) _state = valueOrTrampoline() case NeedsExternalInput ⇒ assert(this eq pop()) - assert(Read eq pop()) + assert(Impl.Read eq pop()) push(value) _state = valueOrTrampoline() case _ ⇒ throw new AssertionError @@ -481,7 +493,7 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends assert(_state == NeedsTrampoline) val value = pop() pop() match { - case FlatMap(_, cont) ⇒ + case Impl.FlatMap(_, cont) ⇒ val contOps = cont(value) if (Debug) println(s"${ctx.self} flatMap yielded $contOps") _state = initialize(contOps) @@ -542,7 +554,8 @@ private[typed] class ProcessInterpreter[T](initial: ⇒ Process[T, Any]) extends } private def shutdown(): Unit = { - ref.sorry.sendSystem(Terminate()) + if (Debug) println(s"${ctx.self} shutting down") + ctx.stop(ref) toRead = -1 processRoots -= this if (deadline != null) removeTimeout(ctx, deadline) diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..8624e9a --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,2 @@ +akka.actor.debug.lifecycle = on +akka.loglevel = DEBUG \ No newline at end of file diff --git a/src/test/scala/com/rolandkuhn/akka_typed_session/AkkaSpec.scala b/src/test/scala/com/rolandkuhn/akka_typed_session/AkkaSpec.scala new file mode 100644 index 0000000..35dadb6 --- /dev/null +++ b/src/test/scala/com/rolandkuhn/akka_typed_session/AkkaSpec.scala @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package com.rolandkuhn.akka_typed_session + +import org.scalactic.Constraint + +import language.postfixOps +import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } +import org.scalatest.Matchers +import akka.actor.ActorSystem +import akka.event.{ Logging, LoggingAdapter } + +import scala.concurrent.duration._ +import scala.concurrent.Future +import com.typesafe.config.{ Config, ConfigFactory } +import akka.dispatch.Dispatchers +import akka.testkit._ +import akka.testkit.TestEvent._ +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.Span + +object AkkaSpec { + val testConf: Config = ConfigFactory.parseString(""" + akka { + loggers = ["akka.testkit.TestEventListener"] + typed.loggers = ["akka.typed.testkit.TestEventListener"] + loglevel = "WARNING" + stdout-loglevel = "WARNING" + actor { + default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 8 + parallelism-factor = 2.0 + parallelism-max = 8 + } + } + } + } + """) + + def mapToConfig(map: Map[String, Any]): Config = { + import scala.collection.JavaConverters._ + ConfigFactory.parseMap(map.asJava) + } + + def getCallerName(clazz: Class[_]): String = { + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*AkkaSpec.?$|.*StreamSpec.?$)") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + } + +} + +abstract class AkkaSpec(_system: ActorSystem) + extends TestKit(_system) with WordSpecLike with Matchers with BeforeAndAfterAll + with ConversionCheckedTripleEquals with ScalaFutures { + + implicit val patience = PatienceConfig(testKitSettings.DefaultTimeout.duration, Span(100, org.scalatest.time.Millis)) + + def this(config: Config) = this(ActorSystem( + AkkaSpec.getCallerName(getClass), + ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) + + def this(s: String) = this(ConfigFactory.parseString(s)) + + def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap)) + + def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf)) + + val log: LoggingAdapter = Logging(system, this.getClass) + + override val invokeBeforeAllAndAfterAllEvenIfNoTestsAreExpected = true + + final override def beforeAll { + atStartup() + } + + final override def afterAll { + beforeTermination() + shutdown() + afterTermination() + } + + protected def atStartup() {} + + protected def beforeTermination() {} + + protected def afterTermination() {} + + def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit): Unit = + Future(body)(system.dispatchers.lookup(dispatcherId)) + + def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) { + def mute(clazz: Class[_]): Unit = + sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue))) + if (messageClasses.isEmpty) mute(classOf[AnyRef]) + else messageClasses foreach mute + } + + // for ScalaTest === compare of Class objects + implicit def classEqualityConstraint[A, B]: Constraint[Class[A], Class[B]] = + new Constraint[Class[A], Class[B]] { + def areEqual(a: Class[A], b: Class[B]) = a == b + } + + implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: Constraint[Set[A], T] = + new Constraint[Set[A], T] { + def areEqual(a: Set[A], b: T) = a == b + } +} diff --git a/src/test/scala/com/rolandkuhn/akka_typed_session/ProcessSpec.scala b/src/test/scala/com/rolandkuhn/akka_typed_session/ProcessSpec.scala index e59f831..017a294 100644 --- a/src/test/scala/com/rolandkuhn/akka_typed_session/ProcessSpec.scala +++ b/src/test/scala/com/rolandkuhn/akka_typed_session/ProcessSpec.scala @@ -1,20 +1,21 @@ /** * Copyright (C) 2016 Lightbend Inc. */ -package akka.typed +package com.rolandkuhn.akka_typed_session -import akka.typed.ScalaProcess._ +import akka.typed._ +import ScalaDSL._ import akka.typed.patterns.Receptionist._ import scala.concurrent.duration._ -import akka.typed.AskPattern._ +import akka.typed.scaladsl.AskPattern._ import org.scalatest.Succeeded import akka.actor.InvalidActorNameException import akka.Done -import akka.typed.Effect._ import java.util.concurrent.TimeoutException import org.scalatest.prop.PropertyChecks import scala.collection.immutable.TreeSet import scala.util.Random +import akka.typed.testkit._ object ProcessSpec { @@ -97,7 +98,7 @@ class ProcessSpec extends TypedSpec { _ = store ! GetData(self) } yield opRead }) - } req.replyTo ! Response(data.msg) + } yield req.replyTo ! Response(data.msg) } val server = @@ -105,11 +106,12 @@ class ProcessSpec extends TypedSpec { for { _ ← opSpawn(backend.named("backend")) self ← opProcessSelf - _ ← retry(1.second, 3, register(self, RequestService).named("register")) - backend ← retry(1.second, 3, getBackend.named("getBackend")) + // _ ← retry(1.second, 3, register(self, RequestService).named("register")) + // backend ← retry(1.second, 3, getBackend.named("getBackend")) + _ <- opCall(register(self, RequestService).named("register")) + backend <- opCall(getBackend.named("getBackend")) } yield OpDSL.loopInf { _ ⇒ - for (req ← opRead) - forkAndCancel(5.seconds, talkWithBackend(backend.addresses.head, req).named("worker")) + for (req ← opRead) yield forkAndCancel(5.seconds, talkWithBackend(backend.addresses.head, req).named("worker")) } } @@ -118,14 +120,21 @@ class ProcessSpec extends TypedSpec { for { serverRef ← opSpawn(server.named("server").withMailboxCapacity(20)) self ← opProcessSelf - } yield OpDSL.loop(2) { _ ⇒ - for { - _ ← opUnit(serverRef ! MainCmd(Request("hello", self))) - msg ← opRead - } msg should ===(Response("yeehah")) - }.map { results ⇒ - results should ===(List(Succeeded, Succeeded)) - } + // } yield OpDSL.loop(2) { _ ⇒ + // for { + // _ ← opUnit(serverRef ! MainCmd(Request("hello", self))) + // msg ← opRead + // } yield msg should ===(Response("yeehah")) + // }.map { results ⇒ + // results should ===(List(Succeeded, Succeeded)) + // } + _ ← opUnit(serverRef ! MainCmd(Request("hello", self))) + msg1 ← opRead + succ1 = msg1 should ===(Response("yeehah")) + _ ← opUnit(serverRef ! MainCmd(Request("hello", self))) + msg2 ← opRead + succ2 = msg2 should ===(Response("yeehah")) + } yield (succ1, succ2) should ===((Succeeded, Succeeded)) }.withTimeout(3.seconds).toBehavior }) } @@ -139,7 +148,7 @@ class ProcessSpec extends TypedSpec { self ← opProcessSelf _ = child ! MainCmd(self) msg ← opRead - } msg should ===(Done) + } yield msg should ===(Done) }.withTimeout(3.seconds).toBehavior }) @@ -152,7 +161,7 @@ class ProcessSpec extends TypedSpec { self ← opProcessSelf _ = child ! MainCmd(self) msg ← opRead - } msg should ===(Done) + } yield msg should ===(Done) }.withTimeout(3.seconds).toBehavior }) @@ -161,8 +170,8 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf child ← opSpawn(opUnit(()).named("unit")) - _ ← opWatch(child, Done, self) - } opRead + _ ← opWatch(child, self, Done) + } yield opRead }.withTimeout(3.seconds).toBehavior }) @@ -172,7 +181,7 @@ class ProcessSpec extends TypedSpec { self ← opProcessSelf filter = muteExpectedException[TimeoutException](occurrences = 1) child ← opSpawn(opRead.withTimeout(10.millis)) - _ ← opWatch(child, null, self, Some(_)) + _ ← opWatch(child, self, null, Some(_)) thr ← opRead } yield { thr shouldBe a[TimeoutException] @@ -186,10 +195,10 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf child ← opSpawn(opUnit(()).named("unit")) - cancellable ← opWatch(child, "dead", self) - _ ← opSchedule(50.millis, "alive", self) + cancellable ← opWatch(child, self, "dead") + _ ← opSchedule(50.millis, self, "alive") msg ← { cancellable.cancel(); opRead } - } msg should ===("alive") + } yield msg should ===("alive") }.withTimeout(3.seconds).toBehavior }) @@ -199,9 +208,9 @@ class ProcessSpec extends TypedSpec { self ← opProcessSelf filter = muteExpectedException[TimeoutException](occurrences = 1) child ← opSpawn(opRead.named("read").withTimeout(10.millis)) - _ ← opWatch(child, Done, self) + _ ← opWatch(child, self, Done) _ ← opRead - } filter.awaitDone(100.millis) + } yield filter.awaitDone(100.millis) }.withTimeout(3.seconds).toBehavior }) @@ -210,7 +219,7 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf _ ← opFork(OpDSL[String] { _ ⇒ self ! ""; opRead }.named("read").withTimeout(1.second)) - } opRead + } yield opRead }.named("child").withTimeout(100.millis) OpDSL[Done] { implicit opDSL ⇒ @@ -219,7 +228,7 @@ class ProcessSpec extends TypedSpec { start = Deadline.now filter = muteExpectedException[TimeoutException](occurrences = 1) child ← opSpawn(childProc) - _ ← opWatch(child, Done, self) + _ ← opWatch(child, self, Done) _ ← opRead } yield { // weird: without this I get diverging implicits on the `>` @@ -232,7 +241,7 @@ class ProcessSpec extends TypedSpec { def `must name process refs appropriately`(): Unit = sync(runTest("naming") { OpDSL[Done] { implicit opDSL ⇒ - opProcessSelf.foreach { self ⇒ + opProcessSelf.map { self ⇒ val name = self.path.name withClue(s" name=$name") { name.substring(0, 1) should ===("$") @@ -249,6 +258,12 @@ class ProcessSpec extends TypedSpec { object `A ProcessDSL (native)` extends CommonTests with NativeSystem { + private def assertStopping(ctx: EffectfulActorContext[_], n: Int): Unit = { + val stopping = ctx.getAllEffects() + stopping.size should ===(n) + stopping.collect { case Effect.Stopped(_) => true }.size should ===(n) + } + def `must reject invalid process names early`(): Unit = { a[InvalidActorNameException] mustBe thrownBy { opRead(null).named("$hello") @@ -265,10 +280,10 @@ class ProcessSpec extends TypedSpec { val ctx = new EffectfulActorContext("name", OpDSL[ActorRef[Done]] { implicit opDSL ⇒ opRead }.named("read").toBehavior, 1, system) - val Spawned(name) = ctx.getEffect() + val Effect.Spawned(name) :: Nil = ctx.getAllEffects() withClue(s" name=$name") { name.substring(0, 1) should ===("$") - name.substring(name.length - 5) should ===("-read") + // FIXME #22938 name.substring(name.length - 5) should ===("-read") } ctx.getAllEffects() should ===(Nil) } @@ -276,10 +291,10 @@ class ProcessSpec extends TypedSpec { def `must read`(): Unit = { val ret = Inbox[Done]("readRet") val ctx = new EffectfulActorContext("read", OpDSL[ActorRef[Done]] { implicit opDSL ⇒ - opRead.foreach(_ ! Done) + opRead.map(_ ! Done) }.named("read").toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() + val Effect.Spawned(procName) = ctx.getEffect() ctx.hasEffects should ===(false) val procInbox = ctx.childInbox[ActorRef[Done]](procName) @@ -292,7 +307,7 @@ class ProcessSpec extends TypedSpec { case other ⇒ fail(s"expected SubActor, got $other") } ctx.run(t) - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 1) ctx.selfInbox.receiveAll() should ===(Nil) ret.receiveAll() should ===(List(Done)) ctx.isAlive should ===(false) @@ -306,7 +321,7 @@ class ProcessSpec extends TypedSpec { }.named("called"))) }.named("call").toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() + val Effect.Spawned(procName) = ctx.getEffect() ctx.hasEffects should ===(false) val procInbox = ctx.childInbox[ActorRef[Done]](procName) @@ -319,9 +334,9 @@ class ProcessSpec extends TypedSpec { case other ⇒ fail(s"expected SubActor, got $other") } ctx.run(t) - val Spawned(calledName) = ctx.getEffect() + val Effect.Spawned(calledName) = ctx.getEffect() - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 2) ctx.selfInbox.receiveAll() should ===(Nil) ret.receiveAll() should ===(List(Done)) ctx.isAlive should ===(false) @@ -336,10 +351,10 @@ class ProcessSpec extends TypedSpec { } }.named("call").toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() + val Effect.Spawned(procName) = ctx.getEffect() val procInbox = ctx.childInbox[ActorRef[Done]](procName) - val Spawned(forkName) = ctx.getEffect() + val Effect.Spawned(forkName) = ctx.getEffect() val forkInbox = ctx.childInbox[ActorRef[Done]](forkName) ctx.hasEffects should ===(false) @@ -355,7 +370,7 @@ class ProcessSpec extends TypedSpec { ctx.run(t1) forkInbox.receiveAll() should ===(List(ret.ref)) - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 1) val t2 = ctx.selfInbox.receiveMsg() t2 match { @@ -364,7 +379,7 @@ class ProcessSpec extends TypedSpec { } ctx.run(t2) - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 1) ctx.selfInbox.receiveAll() should ===(Nil) ret.receiveAll() should ===(List(Done)) ctx.isAlive should ===(false) @@ -379,11 +394,11 @@ class ProcessSpec extends TypedSpec { proc ← opProcessSelf actor ← opActorSelf value ← opUnit(42) - } ret.ref ! Info(sys, proc, actor, value) + } yield ret.ref ! Info(sys, proc, actor, value) }.named("things").toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() - ctx.hasEffects should ===(false) + val Effect.Spawned(procName) = ctx.getEffect() + assertStopping(ctx, 1) ctx.isAlive should ===(false) val Info(sys, proc, actor, value) = ret.receiveMsg() @@ -399,11 +414,11 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf if false - } opRead + } yield opRead }.toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() - ctx.hasEffects should ===(false) + val Effect.Spawned(procName) = ctx.getEffect() + assertStopping(ctx, 1) ctx.isAlive should ===(false) } @@ -413,17 +428,17 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf if false - } opRead + } yield opRead for { _ ← opCall(callee.named("callee")) - } opRead + } yield opRead }.toBehavior, 1, system) - val Spawned(procName) = ctx.getEffect() - val Spawned(calleeName) = ctx.getEffect() - calleeName should endWith("-callee") - ctx.hasEffects should ===(false) + val Effect.Spawned(procName) = ctx.getEffect() + val Effect.Spawned(calleeName) = ctx.getEffect() + // FIXME #22938 calleeName should endWith("-callee") + assertStopping(ctx, 2) ctx.isAlive should ===(false) } @@ -434,7 +449,7 @@ class ProcessSpec extends TypedSpec { for { self ← opProcessSelf if false - } opRead + } yield opRead for { result ← opCall(callee.named("callee"), Some("hello")) @@ -444,10 +459,10 @@ class ProcessSpec extends TypedSpec { } }.toBehavior, 1, system) - val Spawned(_) = ctx.getEffect() - val Spawned(calleeName) = ctx.getEffect() - calleeName should endWith("-callee") - ctx.hasEffects should ===(false) + val Effect.Spawned(_) = ctx.getEffect() + val Effect.Spawned(calleeName) = ctx.getEffect() + // FIXME #22938 calleeName should endWith("-callee") + assertStopping(ctx, 1) ctx.isAlive should ===(true) received should ===("hello") } @@ -462,15 +477,15 @@ class ProcessSpec extends TypedSpec { _ = call(0) _ ← opCleanup(() ⇒ call(1)) _ ← opUnit(call(2)) - } opCleanup(() ⇒ call(3)) - ).foreach { msg ⇒ + } yield opCleanup(() ⇒ call(3)) + ).map { msg ⇒ msg should ===(Done) call(4) } }.toBehavior, 1, system) - val Spawned(_) = ctx.getEffect() - ctx.getAllEffects() should ===(Nil) + val Effect.Spawned(_) = ctx.getEffect() + assertStopping(ctx, 1) ctx.isAlive should ===(false) calls.reverse should ===(List(0, 2, 3, 1, 4)) } @@ -486,23 +501,23 @@ class ProcessSpec extends TypedSpec { _ ← opUnit(call(10)) _ ← opCleanup(() ⇒ call(11)) if false - } call(12) + } yield call(12) (for { _ ← opProcessSelf _ = call(0) _ ← opCleanup(() ⇒ call(1)) _ ← opCall(callee.named("callee")) - } opCleanup(() ⇒ call(3)) - ).foreach { _ ⇒ + } yield opCleanup(() ⇒ call(3)) + ).map { _ ⇒ call(4) } }.toBehavior, 1, system) - val Spawned(_) = ctx.getEffect() - val Spawned(calleeName) = ctx.getEffect() - calleeName should endWith("-callee") - ctx.getAllEffects() should ===(Nil) + val Effect.Spawned(_) = ctx.getEffect() + val Effect.Spawned(calleeName) = ctx.getEffect() + // FIXME #22938 calleeName should endWith("-callee") + assertStopping(ctx, 2) ctx.isAlive should ===(false) calls.reverse should ===(List(0, 10, 11, 1)) } @@ -519,24 +534,24 @@ class ProcessSpec extends TypedSpec { _ ← opCleanup(() ⇒ call(11)) _ ← opCleanup(() ⇒ call(12)) if false - } call(13) + } yield call(13) (for { _ ← opProcessSelf _ = call(0) _ ← opCleanup(() ⇒ call(1)) _ ← opCall(callee.named("callee"), Some("hello")) - } opCleanup(() ⇒ call(3)) - ).foreach { msg ⇒ + } yield opCleanup(() ⇒ call(3)) + ).map { msg ⇒ msg should ===(Done) call(4) } }.toBehavior, 1, system) - val Spawned(_) = ctx.getEffect() - val Spawned(calleeName) = ctx.getEffect() - calleeName should endWith("-callee") - ctx.getAllEffects() should ===(Nil) + val Effect.Spawned(_) = ctx.getEffect() + val Effect.Spawned(calleeName) = ctx.getEffect() + // FIXME #22938 calleeName should endWith("-callee") + assertStopping(ctx, 2) ctx.isAlive should ===(false) calls.reverse should ===(List(0, 10, 12, 11, 3, 1, 4)) } @@ -551,13 +566,13 @@ class ProcessSpec extends TypedSpec { _ ← opCleanup(() ⇒ call(1)) _ ← opCleanup(() ⇒ throw new Exception("expected")) _ ← opRead - } opCleanup(() ⇒ call(3)) - ).foreach { _ ⇒ + } yield opCleanup(() ⇒ call(3)) + ).map { _ ⇒ call(4) } }.toBehavior, 1, system) - val Spawned(mainName) = ctx.getEffect() + val Effect.Spawned(mainName) = ctx.getEffect() ctx.getAllEffects() should ===(Nil) ctx.run(MainCmd("")) @@ -566,11 +581,7 @@ class ProcessSpec extends TypedSpec { a[Exception] shouldBe thrownBy { ctx.run(t) } - calls.reverse should ===(List(3)) - - ctx.signal(PostStop) - - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 1) calls.reverse should ===(List(3, 1, 0)) } @@ -584,14 +595,14 @@ class ProcessSpec extends TypedSpec { (for { _ ← opCleanup(() ⇒ call(0)) _ ← opCleanup(() ⇒ call(1)) - } opRead).named("fork")) + } yield opRead).named("fork")) _ ← opRead - } throw new Exception("expected") + } yield throw new Exception("expected") }.toBehavior, 1, system) - val Spawned(mainName) = ctx.getEffect() - val Spawned(forkName) = ctx.getEffect() - forkName should endWith("-fork") + val Effect.Spawned(mainName) = ctx.getEffect() + val Effect.Spawned(forkName) = ctx.getEffect() + // FIXME #22938 forkName should endWith("-fork") ctx.getAllEffects() should ===(Nil) ctx.run(MainCmd("")) @@ -600,11 +611,7 @@ class ProcessSpec extends TypedSpec { a[Exception] shouldBe thrownBy { ctx.run(t) } - calls.reverse should ===(List()) - - ctx.signal(PostStop) - - ctx.getAllEffects() should ===(Nil) + assertStopping(ctx, 2) calls.reverse should ===(List(1, 0)) } @@ -628,18 +635,20 @@ class ProcessSpec extends TypedSpec { _ = publish(i2) Done ← opForgetState(Key) i3 ← opReadState(Key) - } publish(i3) + } yield publish(i3) }.toBehavior, 1, system) - val Spawned(_) = ctx.getEffect() - ctx.getAllEffects() should ===(Nil) + val Effect.Spawned(_) = ctx.getEffect() + assertStopping(ctx, 1) ctx.isAlive should ===(false) values.reverse should ===(List(0, 5, 2, 4, 0)) } } - object `A ProcessDSL (adapted)` extends CommonTests with AdaptedSystem + object `A ProcessDSL (adapted)` extends CommonTests with AdaptedSystem { + pending // awaiting fix for #22934 in akka/akka + } object `A TimeoutOrdering` extends PropertyChecks { diff --git a/src/test/scala/com/rolandkuhn/akka_typed_session/TypedSpec.scala b/src/test/scala/com/rolandkuhn/akka_typed_session/TypedSpec.scala new file mode 100644 index 0000000..bf7d046 --- /dev/null +++ b/src/test/scala/com/rolandkuhn/akka_typed_session/TypedSpec.scala @@ -0,0 +1,254 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package com.rolandkuhn.akka_typed_session + +import org.scalatest.refspec.RefSpec +import org.scalatest.Matchers +import org.scalatest.BeforeAndAfterAll + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.Future +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import akka.util.Timeout + +import scala.reflect.ClassTag +import akka.actor.ActorInitializationException +import akka.typed._ + +import language.existentials +import akka.testkit.TestEvent.Mute +import akka.typed.scaladsl.Actor._ +import org.scalatest.concurrent.ScalaFutures +import org.scalactic.TypeCheckedTripleEquals +import org.scalactic.CanEqual + +import scala.util.control.NonFatal +import akka.typed.scaladsl.AskPattern + +import scala.util.control.NoStackTrace +import akka.typed.testkit.{ Inbox, TestKitSettings } +import org.scalatest.time.Span +import akka.testkit.EventFilter + +/** + * Helper class for writing tests for typed Actors with ScalaTest. + */ +class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals { + + // TODO hook this up with config like in akka-testkit/AkkaSpec? + implicit val akkaPatience = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) + +} + +/** + * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. + */ +abstract class TypedSpec(val config: Config) extends TypedSpecSetup { + import TypedSpec._ + import AskPattern._ + + def this() = this(ConfigFactory.empty) + + def this(config: String) = this(ConfigFactory.parseString(config)) + + // extension point + def setTimeout: Timeout = Timeout(1.minute) + + private var nativeSystemUsed = false + lazy val nativeSystem: ActorSystem[TypedSpec.Command] = { + val sys = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + nativeSystemUsed = true + sys + } + private var adaptedSystemUsed = false + lazy val adaptedSystem: ActorSystem[TypedSpec.Command] = { + val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + adaptedSystemUsed = true + sys + } + + implicit val timeout = setTimeout + implicit def scheduler = nativeSystem.scheduler + + trait StartSupport { + def system: ActorSystem[TypedSpec.Command] + + private val nameCounter = Iterator.from(0) + def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}" + + def start[T](behv: Behavior[T]): ActorRef[T] = { + import akka.typed.scaladsl.AskPattern._ + import akka.typed.testkit.scaladsl._ + implicit val testSettings = TestKitSettings(system) + Await.result(system ? TypedSpec.Create(behv, nextName()), 3.seconds.dilated) + } + } + + trait NativeSystem { + def system: ActorSystem[TypedSpec.Command] = nativeSystem + } + + trait AdaptedSystem { + def system: ActorSystem[TypedSpec.Command] = adaptedSystem + } + + override def afterAll(): Unit = { + if (nativeSystemUsed) + Await.result(nativeSystem.terminate, timeout.duration) + if (adaptedSystemUsed) + Await.result(adaptedSystem.terminate, timeout.duration) + } + + // TODO remove after basing on ScalaTest 3 with async support + import akka.testkit._ + def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) + + lazy val blackhole = await(nativeSystem ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) + + /** + * Run an Actor-based test. The test procedure is most conveniently + * formulated using the [[StepWise$]] behavior type. + */ + def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] = + system ? (RunTest(name, behavior, _, timeout.duration)) + + // TODO remove after basing on ScalaTest 3 with async support + def sync(f: Future[Status])(implicit system: ActorSystem[Command]): Unit = { + def unwrap(ex: Throwable): Throwable = ex match { + case ActorInitializationException(_, _, ex) ⇒ ex + case other ⇒ other + } + + try await(f) match { + case Success ⇒ () + case Failed(ex) ⇒ + unwrap(ex) match { + case ex2: TypedSpec.SimulatedException ⇒ + throw ex2 + case _ ⇒ + println(system.printTree) + throw unwrap(ex) + } + case Timedout ⇒ + println(system.printTree) + fail("test timed out") + } catch { + case ex: TypedSpec.SimulatedException ⇒ + throw ex + case NonFatal(ex) ⇒ + println(system.printTree) + throw ex + } + } + + def muteExpectedException[T <: Exception: ClassTag]( + message: String = null, + source: String = null, + start: String = "", + pattern: String = null, + occurrences: Int = Int.MaxValue)(implicit system: ActorSystem[Command]): EventFilter = { + val filter = EventFilter(message, source, start, pattern, occurrences) + system.eventStream.publish(Mute(filter)) + filter + } + + /** + * Group assertion that ensures that the given inboxes are empty. + */ + def assertEmpty(inboxes: Inbox[_]*): Unit = { + inboxes foreach (i ⇒ withClue(s"inbox $i had messages")(i.hasMessages should be(false))) + } + + // for ScalaTest === compare of Class objects + implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] = + new CanEqual[Class[A], Class[B]] { + def areEqual(a: Class[A], b: Class[B]) = a == b + } + + implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] = + new CanEqual[Set[A], T] { + def areEqual(a: Set[A], b: T) = a == b + } +} + +object TypedSpec { + import akka.{ typed ⇒ t } + + sealed abstract class Start + case object Start extends Start + + sealed trait Command + case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command + case class Terminate(reply: ActorRef[Status]) extends Command + case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command + + sealed trait Status + case object Success extends Status + case class Failed(thr: Throwable) extends Status + case object Timedout extends Status + + class SimulatedException(message: String) extends RuntimeException(message) with NoStackTrace + + def guardian(outstanding: Map[ActorRef[_], ActorRef[Status]] = Map.empty): Behavior[Command] = + immutable[Command] { + case (ctx, r: RunTest[t]) ⇒ + val test = ctx.spawn(r.behavior, r.name) + ctx.schedule(r.timeout, r.replyTo, Timedout) + ctx.watch(test) + guardian(outstanding + ((test, r.replyTo))) + case (_, Terminate(reply)) ⇒ + reply ! Success + stopped + case (ctx, c: Create[t]) ⇒ + c.replyTo ! ctx.spawn(c.behavior, c.name) + same + } onSignal { + case (ctx, t @ Terminated(test)) ⇒ + outstanding get test match { + case Some(reply) ⇒ + if (t.failure eq null) reply ! Success + else reply ! Failed(t.failure) + guardian(outstanding - test) + case None ⇒ same + } + case _ ⇒ same + } + + def getCallerName(clazz: Class[_]): String = { + val s = (Thread.currentThread.getStackTrace map (_.getClassName) drop 1) + .dropWhile(_ matches "(java.lang.Thread|.*TypedSpec.?$)") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + } +} + +class TypedSpecSpec extends TypedSpec { + + object `A TypedSpec` { + + trait CommonTests { + implicit def system: ActorSystem[TypedSpec.Command] + + def `must report failures`(): Unit = { + val f = + if (system == nativeSystem) muteExpectedException[TypedSpec.SimulatedException](occurrences = 1) + else muteExpectedException[ActorInitializationException](occurrences = 1) + a[TypedSpec.SimulatedException] must be thrownBy { + sync(runTest("failure")(deferred[String] { ctx => + throw new TypedSpec.SimulatedException("expected") + })) + } + f.assertDone(1.second) + } + } + + object `when using the native implementation` extends CommonTests with NativeSystem + object `when using the adapted implementation` extends CommonTests with AdaptedSystem + } +}