Implementation of CSP constructions (Communication Sequence Process, i.e. go-like channels) in scala
Scala TeX
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
notes
project
src
LICENSE
NOTICE
README.md
build.sbt

README.md

Gopher: asynchronous implementation of go-like channels/selectors in scala

=======

Dependences:

  • scala 2.12.x or 2.11.x
  • akka 2.5.x +
  • scala-async 0.9.6

Download:

libraryDependencies += "com.github.rssh" %% "scala-gopher" % "0.99.10"

(or 0.99.11-SNAPSHOT for development version).

Scala-gopher is open source (license is Apache2); binaries are available from the maven-central repository.

Overview

Scala-gopher is a scala library, build on top of Akka and SIP-22 async, which provide an implementation of CSP [Communicate Sequential Processes] primitives, known as 'Go-like channels.' Also, analogs of go/defer/recover control-flow constructions are provided.

Note, which this is not an emulation of go language structures in Scala, but rather a reimplementation of the main ideas in 'scala-like' manner.

Initialization

You need an instance of gopherApi for creating channels and selectors. The easiest way is to use one as Akka extension:

  import akka.actors._
  import gopher._

  ......
 
  val actorSystem = ActorSystem.create("system")
  val gopherApi = Gopher(actorSystem)    

In akka.conf we can place config values in 'gopher' entry.

Control flow constructions:

goScope

goScope[T](body: =>T) is expression, which allows to use inside body go-like 'defer' and 'recover' expression.

Typical usage:

import gopher._
import java.io._

object CopyFile {

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("usage: copy in out");
    }
    copy(new File(args(1)), new File(args(2)))
  }

  def copy(inf: File, outf: File): Long =
    goScope {
      val in = new FileInputStream(inf)
      defer {
        in.close()
      }
      val out = new FileOutputStream(outf);
      defer {
        out.close()
      }
      out.getChannel() transferFrom(in.getChannel(), 0, Long.MaxValue)
    }

}

Here statements inside defer block executed at the end of goScope block in reverse order.

Inside goScope we can use two pseudo functions:

  • defer(body: =>Unit):Unit - defer execution of body until the end of go or goScope block and previous defered blocks.
  • recover[T](f:PartialFunction[Throwable,T]):Boolean -- can be used only within defer block with next semantics:
    • if exception was raised inside go or goScope than recover try to apply f to this exception and
      • if f is applicable - set f(e) as return value of the block and return true
      • otherwise - do nothing and return false
    • during normal exit - return false.

You can look on defer as on stackable finally clauses, and on defer with recover inside as on catch clause. Small example:

val s = goScope{ 
           defer{ recover{
                     case ex: Throwable => "CCC"
           }    } 
           throw new Exception("")
           "QQQ" 
        }

will set s to "CCC".

go

go[T](body: =>T)(implicit ex:ExecutionContext):Future[T] starts asynchronous execution of body in provided execution context. Inside go we can use defer/recover clauses and blocked read/write channel operations.

Go implemented on top of SIP-22 async and share the same limitations. In addition to async/await transfoirm go provide lifting up asynchronous expressions inside some well-known hight-order functions (i.e. it is possible to use async operations inside for loops). Details are available in the tech report: https://arxiv.org/abs/1611.00602 .

Channels

Channels are used for asynchronous communication between execution flows.

When using channel inside go block, you can look at one as on classic blocked queue with fixed size with methods read and write:

 val channel = gopherApi.makeChannel[Int];

 go {
   channel.write(a)
 }
 ......
 go {
   val i = channel.read 
 }
  • channel.write(x) - send x to channel and wait until one will be sent (it is possible us as synonyms channel<~x and channel!x if you prefer short syntax)
  • channel.read or (channel ?) - blocking read

Blocking operations can be used only inside go or Async.await blocks.

Outside we can use asynchronous version:

  • channel.awrite(x) will write x and return to us Future[Unit] which will be executed after x will send
  • channel.aread will return future to the value, which will be read.

Also, channels can be closed. After this attempt to write will cause throwing 'ClosedChannelException.' Reading will be still possible up to 'last written value', after this attempt to read will cause the same exception. Also, each channel provides done input for firing close events.

Note, closing channels are not mandatory; unreachable channels are garbage-collected regardless of they are closed or not.

Channels can be buffered and unbuffered. In a unbuffered channel, write return control to the caller after another side actually will start processing; buffered channel force provider to wait only if internal channel buffer is full.

Also, you can use only Input or Output interfaces, where an appropriative read/write operations are defined. For Input, exists usual collection functions, like map, zip, takeN, fold ... etc. Scala Iterable can be represented as channels.Input via method gopherApi.iterableInput. Also, we can use Scala futures as channels, which produce one value and then closes. For obtaining such input use gopherApi.futureInput.

| (i.e. or) operator used for merged inputs, i.e. (x|y).read will read a value from channel x or y when one will be available.

For each input and output you can create a facility with tracked timeout, i.e. if in is input, then

 val (inReady, inTimeouts) = in.withInputTimeouts(10 seconds)

will return two inputs, where reading from inReady will return the same as reading from in. And if waiting for reading takes longer than 10 seconds then the value of timeout will be available in inTimeouts. Analogically we can create output with timeouts:

 val (outReady, outTimeouts) = out.withOutputTimeouts(10 seconds)

Also, note that you can provide own Input and Output implementations by implementing callback cbread and cbwrite methods.

Select loops and folds

'select statement' is somewhat similar to Unix 'select' syscall: from a set of blocking operations select one who is ready to input/output and run it.

The usual pattern of channel processing in go language is to wrap select operation into an endless loop.

Gopher provides similar functionality:

go{
  for( s <- gopherApi.select.forever) 
    s match {
      case i:channelA.read => ..do-something-with-i
      case ch:channelB.read .. do-something-with-b
  }
}

Here we read in the loop from channelA or channelB.

Body of select loop must consist only of one match statement where left parts in case clauses must have the following form

  • v:channel.read (for reading from channel)
  • v:Tye if (v==read(ch)) (for reading from channel or future)
  • v:channel.write if (v==expr) (for writing expr into channel).
  • v:Type if (v==write(ch,expr)) (for writing expr into channel).
  • _ - for 'idle' action.

For endless loop inside go we can use the shortcut with the syntax of partial function:

     gopherApi.select.forever{ 
         case i:channelA.read => ... do-something-with-i
         case ch:channelB.read ... do-something-with-b
     }

Inside case actions, we can use blocking read/writes and await operations. Call of doExit in the implicit instance of FlowTermination[T] (for a forever loop this is FlowTermination[Unit]) can be used for exiting from the loop; select.exit and select.shutdown macroses are shortcuts for this.

Example:

val channel = gopherApi.makeChannel[Int](100)
     
val producer = channel.awrite(1 to 1000)
     
@volatile var sum = 0;
val consumer = gopherApi.select.forever{
        case i: channerl.read  =>
                  sum = sum + i
                  if (i==1000)  {
                    select.shutdown()
                  }
}
     
Await.ready(consumer, 5.second)

A combination of variable and select loop better modeled with help 'fold over select' construction:

val sum = gopherApi.select.afold(0) { (state, selector) =>
   selector match {
        case i: channel.read  =>
                          val nstate = state + i
                          if (i==1000) {
                            select.exit(nstate)
                          }
                          nstate
   }
}

More than one variables in state can be modeled with partial function case syntax:

val fib = select.afold((0,1)) { case ((x,y), s) =>
    s match {
      case x:channel.write => (y,y+x)
      case q:quit.read => select.exit((x,y))
    }
}

Also, we can use 'map over select' to represent results of handling of different events as input side of a channel:

val multiplexed = select amap {
   case x:ch1.read => (s1,x)
   case y:ch2.read => (s2,y)
 } 

For using select operation not enclosed in a loop, scala-gopher provide select.once syntax:

gopherApi.select.once{
  case i: channelA.read => s"Readed(${i})"
  case x:channelB.write if (x==1) => s"Written(${x})" 
}

Such form can be called from any environment and will return Future[String]. Inside go you can wrap this in await of use 'for' syntax as with forever.

go {
  .....
  val s = for(s <-gopherApi.select.once) 
             s match {
               case i: channelA.read => s"Readed(${i})"
               case x: channelB.write if (x==1) => s"Written(${x})" 
             }
      
}  

and afold become fold:

go {
  ...
  val sum = select.fold(0) { (n,s) =>
             s match {
               case x: channelA.read => n+x
               case q: quit.read => select.exit(n)
             }
            }
}

amap - map

val multiplexed = for(s <- select) yield
                    s match {
                      case x:ch1.read => (s1,x)
                      case y:ch2.read => (s2,y)
                    }

Done signals.

Sometimes it is useful to receive a message when some Input becomes closed. Such inputs are named 'CloseableInputs' and provides a way to receive close notification in selector using done pseudo-type.

 select.foreach{
   case x:ch.read => Console.println(s"received: ${x}")
   case _:ch.done => Console.println(s"done")
                     select.exit(())
 }

Note, that you must exit from current flow in done handler, otherwise done signals will be intensively generated in a loop.

Effected{Input,Output,Channel}

One useful programming pattern, often used in CSP-style programming: have a channel from wich we read (or to where we write) as a part of a state. In Go language, this is usually modelled as a mutable variable, changed inside the same select statement, where one is read/written.

In scala-gopher, we have the ability to use a technique of 'EffectedChannel', which can be seen as an entity, which holds channel, can be used in read/write and can be changed only via effect (operation, which accepts the previous state and returns the next).

Let's look at the example:

 def generate(n:Int, quit:Promise[Boolean]):Channel[Int] =
  {
    val channel = makeChannel[Int]()
    channel.awriteAll(2 to n) andThen (_ => quit success true)
    channel
  }

 def filter(in:Channel[Int]):Input[Int] =
  {
     val filtered = makeChannel[Int]()
     val sieve = makeEffectedInput(in)
     sieve.aforeach { prime =>
            sieve <<= (_.filter(_ % prime != 0))
            filtered <~ prime
      }
    filtered
  }

Here in 'filter', we generate a set of prime numbers, and make a sieve of Eratosthenes by sequentially applying 'filter' effect to state of sieve EffectedInput.

Transputers

The logic of data transformation between channels can be encapsulated in special Transputer concept. (Word 'transputer' was chosen as a reminder about INMOS processor, for which one of the first CSP languages, Occam, was developed). You can view on transputer as a representation of a restartable process that consists from:

  • Set of named input and output ports.
  • Logic for propagating information from the input to the output ports.
  • Possible state
  • Logic of error recovering.

I.e. we saw that Transputer is similar to Actor with the following difference: When Actor provides reaction to incoming messages from the mailbox and sending signals to other actors, Transputers provide processing of incoming messages from input ports and sending outcoming messages to output ports. When operations inside Actor must not be blocked, operations inside Transputer can wait.

Transformers are build hierarchically with the help of 3 operations:

  • select (logic is execution of a select statement )
  • parallel combination (logic is parallel execution of parts)
  • replication (logic is a parallel execution of a set of identical transformers.)

Select transputer

Let's look at a simple example: transputer with two input ports and one output. When the same number has come from inA and inB, then transputer prints Bingo on console and output this number to out:

 trait BingoTransputer extends SelectTransputer
 {
    val inA = InPort[Int]
    val inB = InPort[Int]
    val out = OutPort[Boolean]

    loop {
      case x:inA.read =>
             y = inB.read
             out.write(x==y)
             if (x==y) {
               Console.println(s"Bingo: ${x}")
             }
    }

 }

A select loop is described in loop statement.

To create transputer we can use gopherApi.makeTransputer call:

val bing = gopherApi.makeTransputer[BingoTransputer]

after the creation of transputer, we can create channels, connect one to ports and start transformer.

val inA = makeChannel[Int]()
bingo.inA.connect(inA)
val inB = makeChannel[Int]()
bingo.inB.connect(inB)
val out = makeChannel[Int]()
bingo.out.connect(out)

val shutdownFuture = bingo.start()

Then after we will write to inA and inB values (1,1) then true will become available for reading from out.

Error recovery

On an exception from a loop statement, transputer will be restarted with ports, connected to the same channels. Such behavior is the default; we can configure one by setting recovery policy:

val t = makeTransputer[MyType].recover {
           case ex: MyException => SupervisorStrategy.Escalate
        }

Recovery policy is a partial function from throwable to akka SupervisorStrategy.Direction. Escalated exceptions are passed to parent transputers or to TransputerSupervisor actor, which handle failures according to akka default supervisor strategy.

How many times transputer can be restarted within given period can be configured via failureLimit call:

 t.failureLimit(maxFailures = 20, windowDuration = 10 seconds)

This setting means that if 20 failures will occur during 10 seconds, then exception Transputer.TooManyFailures will be escalated to parent.

Par transputers.

'Par' is a group of transputers running in parallel. Par transputer can be created with the help of plus operator:

val par = (t1 + t1 + t3)
par.start()

When one from t1, t2, ... is stopped or failed, then all other members of par are stopped. After this par can be restarted according to current recovery policy.

Replication

Replicated transputer is a set of identical transputers t_{i}, running in parallel. It can be created with gopherApi.replicate call. Next code fragment:

val r = gopherApi.replicate[MyTransputer](10)

will produce ten copies of MyTransputer (r will be a container transputer for them). Ports of all replicated internal transputers will be shared with ports of the container. (I.e. if we will write something to input port then it will be read by one of the replicas; if one of the replicas will write something to out port, this will be visible in out port of the container.)

Mapping from a container to replica port can be changed from sharing to other approaches, like duplicating or distributing, via applying port transformations.

For example, next code fragment:

r.inA.duplicate()
 .inB.distribute( _.hashCode )

will set port inA be duplicated in replicas (i.e. message, send to container port inA will be received by each instance) and messages from inB will be distributed by hashcode: i.e. messages with the same hashcode will be directed to the same replica. Such behavior is useful when we keep in replicated transputer some state information about messages.

Stopping and recovering of replicated transformer is the same as in par (i.e. stopping/failing of one instance will cause stopping/failing of container)

Also note, that we can receive a sequence of replicated instances with the help of ReplicateTransformer.replicated method.

Unsugared interfaces

It is worth to know that exists gopher API without macro-based syntax sugar.

(
   new ForeverSelectorBuilder(gopherApi)
          .reading(ch1){ x => something-x }
          .writing(ch2,y){ y => something-y }
          .idle(something idle).go
)

can be used instead of appropriative macro-based call.

Moreover, for tricky things exists even low-level interface, which can combine computations by adding to functional interfaces, similar to continuations:

{
  val selector = new Selector[Unit](gopherApi)
  selector.addReader(ch1, cont=>Some{ in => something-x
                                        Future successful cont
                                    }
                    )
 selector.addWriter(ch2, cont=>Some{(y,{something y;
                                           Future successful cont
                    })})                  
 selector.addIdle(cont => {..do-something-when-idle; Future successful cont})
} 

Please, consult with source code for details.

Additional Informatiom