add-on to Akka Typed that tracks effects for use with Session Types
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.

Build Status

Akka Typed Session

Add-on to Akka Typed that tracks effects for use with Session Types.


Assume this message protocol:

case class AuthRequest(credentials: String)(replyTo: ActorRef[AuthResult])

sealed trait AuthResult
case object AuthRejected extends AuthResult
case class AuthSuccess(token: ActorRef[Command]) extends AuthResult

sealed trait Command
case object DoIt extends Command

The process to be followed is that first an AuthRequest is sent, answered by an AuthResult that may or may not unlock further communication of Command messages. A more formal definition of this sequence is the following:

trait Protocol {
  type Session <: Effects

object MyProto extends Protocol {
  type Session = //
    Send[AuthRequest] :: // first ask for authentication
    Read[AuthResult] :: // then read the response
    Choice[(Halt :: _0) :+: _0 :+: CNil] :: // then possibly terminate if rejected
    Send[Command] :: _0 // then send a command

Implementing the first part of this exchange is the familiar ask pattern or request–response. We can use the process DSL to factor this out into a utility function:

def opAsk[T, U](target: ActorRef[T], msg: ActorRef[U] => T) =
  OpDSL[U] { implicit opDSL =>
    for {
      self <- opProcessSelf
      _ <- opSend(target, msg(self))
    } yield opRead

Here the OpDSL constructor provides an environment in which the behavior behind a typed ActorRef[U] can be defined. The opProcessSelf is an operation that when run will yield the aforementioned ActorRef[U]. opSend is an operation that when run will send the given message to the given target. As the last step in this mini-protocol opRead awaits the reception of a message sent to self, i.e. the received message will be of type U.

It should be noted that in this process DSL algebra the .map() combinator has the same behavior as .flatMap() where possible, i.e. it will flatten if the returned value is an Operation. This is necessary in order to avoid memory leaks for infinite processing loops (as seen e.g. in server processes that respond to an unbounded number of requests).

Now we can use this ask operation in the context of the larger overall process. Calling such a compound operation is done via the opCall operator.

val auth: ActorRef[AuthRequest] = ??? // assume we get the authentication endpoint from somewhere

val p = OpDSL[String] { implicit opDSL 
  for {
    AuthSuccess(token) <- opCall(opAsk(auth, AuthRequest("secret")).named("getAuth"))
  } yield opSend(token, DoIt)

The resulting type of p is not just an Operation with String for the self-type, yielding Unit (the result of opSend), but it also tracks the effects that occur when executing the whole process. We can assert that the externally visible effects of sending and receiving messages match the protocol definition given above by asking the compiler to construct a proof:

def vetProtocol[E <: Effects, F <: Effects](p: Protocol, op: Operation[_, _, E])(
  implicit f: E.ops.FilterAux[E, ExternalEffect, F], ev: F <:< p.Session): Unit = ()

vetProtocol(MyProto, p)

This works in two steps: first the list of effects E (which is somewhat like an HList with a special node for infinite loops) is filtered so that only effects remain that are subtypes of ExternalEffect, yielding the list F. Then this list is compared to the Session type member of the given protocol to see whether it is a subtype.

The result is that the whole program only compiles if the process performs all the required externally visible effects in the right order. If a step is forgotten or duplicated then the vetProtocol invocation will raise a type error.


See the LICENSE file for details on licensing and CONTRIBUTING for the contributor’s guide.

Copyright 2017 Roland Kuhn