Skip to content

Async unsugared #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Oct 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
10b076a
Merge pull request #5 from rssh/async-unsugared-read-exception-on-close
rssh Sep 27, 2014
5b6fd0f
fixed double reading from closed channel.
rssh Sep 28, 2014
300c671
checked cleanup
rssh Sep 28, 2014
47e6b3b
make coroutines test to run
rssh Sep 28, 2014
9a4d021
add GoAsync
rssh Sep 28, 2014
536b168
defer functionality removed from FloeTermination
rssh Sep 28, 2014
77e7c5e
1. added Fibonnachy suite.
rssh Sep 30, 2014
f67dca8
imlemented zip for inputs.
rssh Oct 1, 2014
dcc9fd3
working zipper
rssh Oct 1, 2014
d9a3e01
comment about submitted compiler bug.
rssh Oct 1, 2014
270f43e
Defers in progress
rssh Oct 2, 2014
e38afec
defers debugged.
rssh Oct 2, 2014
a30e2c6
added implementation of go and goscope
rssh Oct 3, 2014
dec94eb
move examples to new implementation
rssh Oct 3, 2014
3d7bb10
test with 'go' added.
rssh Oct 3, 2014
651abb7
implemented partial-syntax sugar
rssh Oct 4, 2014
4c3d826
renamed 'GopherAPIExtension' to 'Gopher'
rssh Oct 4, 2014
889a7fd
dependencies update
rssh Oct 4, 2014
928f81c
removed debug output
rssh Oct 4, 2014
487ed1f
removed sources of previous version
rssh Oct 4, 2014
ab3a13a
new docs.
rssh Oct 4, 2014
071d97d
scaladoc updated.
rssh Oct 4, 2014
391da00
updare README.md to be readable in github
rssh Oct 4, 2014
f57c8bf
github markdown in progress
rssh Oct 4, 2014
e97082a
githib0flawored mardown
rssh Oct 4, 2014
4e539db
more .md adoption
rssh Oct 4, 2014
1c7f335
adopt markdown to git
rssh Oct 4, 2014
e2b584d
better title
rssh Oct 4, 2014
90d3f47
yet better title
rssh Oct 4, 2014
fb3fc48
added package file for scaladoc
rssh Oct 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 174 additions & 139 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,46 @@

Fully asyncronics implementation of go-like channels/selectors in scala
-----------------------------------------------------------------------
-----------------------------------------------------------------------
## Gopher: asynchronous implementation of go-like channels/selectors in scala

Requirements:
------------
### Dependences:

* scala 2.11.2 +
* scala-async plugin which you must install by hands from
https://github.com/scala/async.git (on the time of writing this document
livrary was not yet published).
* akka 2.3.6 +
* scala-async 0.9.2

Download:
---------
#### Download:

libraryDependencies += "com.github.rssh" %% "scala-gopher" % "0.9.1"
libraryDependencies += "com.github.rssh" %% "scala-gopher" % "0.99.0"


What-s inside:
--------------
--------------
## Overview

Scala-gopher is a scala library, build on top of akka and SIP-22 async, which provide implementation of
CSP [Communicate Sequential Processes] primitives, known as 'Go-like channels'. Also analogs of go/defer/recover control flow constructions are provided.

Note, that this is not an emulation of go language constructions in scala, but rather reimplementation of key ideas in 'scala-like' maner.



### Initialization

You need instance of gopherApi for creating channels and selectors. The most easy way is to use one as Akka extension:

import akka.actors._
import gopher._

......

val actorSystem = ActorSystem.create("system")
val gopherApi = Gopher(actorSystem)

Scope
-----
In akka.conf we can place config values in 'gopher' entry.

## Control flow constructions:

### goScope

Library define 'goScope' expression, which allows to use inside
goScope go-like 'defer', 'panic' and 'recover' expression.
`goScope[T](body: =>T)` is expression, which allows to use inside `body` go-like 'defer' and 'recover' expression.

Typical usage:

import gopher._
Expand All @@ -52,175 +67,195 @@

}

Here statements inside defer block are executed at the end of goScope block
in reverse order.
Here statements inside defer block are executed at the end of goScope block in reverse order.

Basically, inside goScope we can use two pseudofunctions:

Take a look at introduction article about go-like control flow constructs:
http://blog.golang.org/defer-panic-and-recover
* `defer(body: =>Unit):Unit` - defer execution of `body` until the end of `go` or `goScope` block and previous defered blocks.
* `recover[T](f:PartialFunction[Throwable,T]):Boolean` -- can be used only within `defer` block with next semantics:
* * if exeception was raised inside `go` or `goScope` than `recover` try to apply `f` to this exception and
* * * if `f` is applicable - set `f(e)` as return value of the block and return true
* * * otherwise - do nothing and return false
* * during normal exit - return fase.

Basically, goScope wrap it's argument into try/finalize block and rewrite
*defer* calls to construction of list of code blocks, which are
executed in finalize. *panic* calls are just throwing exception and
*recover* (which can be executed only inside *defer*) is returning a value
of recover argument instead rethrowing exception:
You can look on `defer` as on stackable finally clauses, and on `defer` with `recover` inside as on `catch` clause. Small example:

val s = goScope{
defer{ recover("CCC") }
panic("panic message")
val s = goScope{
defer{ recover{
case ex: Throwable => "CCC"
} }
throw new Exception("")
"QQQ"
}

will set *s* to "CCC".

One question -- what to do if exception is occured inside one of defer blocks (?). By default we assume that statements inside *defer* have cleanup
semantics, so exception inside ones are suppressed. You can override this
semantics by calling special construction *throwSuppressed* inside first
*defer* block (which will evaluated last) i.e.

goScope{
defer{ throwSuppressed }
..........
}

will throw suppressed exceptions if one exists.

Also we can get list of suppressed using suppressedExceptions construction:

goScope{
defer{
if (suppressedExceptions.notEmpty) {
System.err.println("suppresses exceptions:")
for(e <- suppressedExceptions) e.printStackTrace
}
}
..........
}
will set `s` to "CCC".


Go statement
------------

Go statement starts the execution of expression in independent thread.
We map one to Future call which use current implicit execution statement,
so next 'go-like' code
### go

go {
expr:A
}
`go[T](body: =>T)(implicit ex:ExecutionContext):Future[T]` starts asyncronics execution of `body` in provided execution context. Inside go we can use `defer`/`recover` clauses and blocked read/write channel operations.

have type *Future[A]* and mapped into plain scala as combination of *Future*
and *goScope* .
Basucally, go implemented on top of [SIP-22](http://docs.scala-lang.org/sips/pending/async.html) async and share the same limitations.

Channels
--------
## Channels

Channels is a way for organizing go-like message passing. Basically you
can look on it as on classic blocked queue. Different 'goroutines', executed
in different flows can exchange messages via channels.
You can look on channel as on classic blocked queue with fixed size. Different execution flows can exchange messages via channels.


val channel = makeChannel[Int];
val channel = gopherApi.makeChannel[Int];

go {
for(i <- 1 to 10) channel <~ i
channel.write(a)
}

......
go {
val i1 = channel?
val i2 = channel?
val i = channel.read
}


*channel <~ i* - send i to channel (it is possible to use '!' as synonym, to
provide interface, simular to actors), *i = channel ?* - blocked read
of channell. Note, that unlike akka, default read and write operations are
blocked. Unlike go, we also provide 'immediate' and 'timeouted' versions
of read/write operations.

Select loop
----------

May-be one of most unusual language constructions in go is
'select statement' which work in somewhat simular to unix 'select' syscall:
from set of blocking operations select one which is ready for input/output
and run it.

The common pattern of channel processing in go language is wrap select
operarion into endless loop.

Gopher provides simular functionality with 'select loops':

* `channel.write(x)` - send x to channel and wait until one will be send (it is possible us as synonims `channel<~x` and `channel!x` if you prefere short syntax)
* `channel.read` or `(channel ?)` - blocking read

Blocking operations can be used only inside `go` or `Async.await` blocks.

Outside we can use asynchronics version:

* `channel.awrite(x)` will write `x` and return to us `Future[Unit]` which will be executed after x will send
* `channel.aread` will reaturn feature to value, which will be readed.

Also channels can be closed, after this attempt to write will cause throwing of 'ClosedChannelException', reading is possible up to 'last written value', after this attempt to read will cause same exception.

Note, that closing channels is not mandatory, unreachable channels are garbage-collected regardless of they are closed or not.

Also you can use only 'Input' or 'Output' interfaces, where appropriative read/write operations is defined.
For input we have defined usual collection functions, like `map`, `zip`, `takeN` . Scala Iterable can be represented as `channels.Input` via method `asInput`

Also note, that you can provide own Input and Output implementations by implementing callback `cbread` and `cbwrite` methods.


import gopher._
## Select loops

'select statement' is somewhat simular to unix 'select' syscall:
from set of blocking operations select one which is ready for input/output and run it.

for( s <- select )
The common pattern of channel processing in go language is wrap select operation into endless loop.

Gopher provides simular functionality with 'select loops':

go{
for( s <- gopherApi.select.forever)
s match {
case `channelA` ~> (i:Int) => ..do-something-with-i
case `channelB' ~> (ch: Char) => .. do-something-with-b
case i:channelA.read => ..do-something-with-i
case ch:channelB.read .. do-something-with-b
}
}

Here we read in loop from channelA or channelB.

Body of select loop must consists only from one *match* statement where
patterns in *case* clauses must have form *channel ~> (v:Type)*
(for reading from channel) or *channel <~ v* (for writing).
Body of select loop must consists only from one `match` statement where
patterns in `case` clauses must have form

* `v:channel.read` (for reading from channel)
* `v:channel.write if (v==expr])` (for writing `expr` into channel).
* `_` - for 'idle' action.

For endless loop inside go we can use shortcut with syntax of partial function:

gopherApi.select.forever{
case i:channelA.read => ..do-something-with-i
case ch:channelB.read .. do-something-with-b
}


Yet one example:
Inside case actions we can use blocing read/writes and await operations. For ending loop we must call doExit in implicit instance of `FlowTermination[T]` (for forever loop this is `FlowTermination[T]`)

Example:

val channel = makeChannel[Int](100)
val channel = gopherApi.makeChannel[Int](100)

val producer = channel.awrite(1 to 1000)

go {
for( i <- 1 to 1000)
channel <~ i
}
@volatile var sum = 0;
val consumer = gopherApi.select.forever{
case i: channerl.read =>
sum = sum + i
if (i==1000) {
implictily[FlowTermination[Unit]].doExit(())
}
}

var sum = 0;
val consumer = go {
for(s <- select) {
s match {
case `channel` ~> (i:Int) =>
sum = sum + i
if (i==1000) s.shutdown()
}
}
sum
}

Await.ready(consumer, 5.second)
Await.ready(consumer, 5.second)

Note the use of *s.shutdown* method for ending select loop.

For using select operation not enclosed in loop, scala-gopher provide
*select.once* syntax:

```
gopherApi.select.once{
case i: channelA.read => s"Readed(${i})"
case x:channelB.write if (x==1) => s"Written(${x})"
}
```

for(s <- select.once) s match {
case `channelA` ~> (i:Type) => ...do read ..
case `channelB` <~ 1 => .. when writed ..
}

Which is corresponding to one plain select in go language.



Interaction with Actors
-----------------------

We can bind channel output to actor (i.e. all, what we will write to channel
will be readed to actor) with call
Such form can be called from any environment and will return `Future[String]`. Inside `go` you can wrap this in await of use 'for' syntax as with `forever`

bindChannelRead[A](read: InputChannel[A], actor: ActorRef)
```
go {
.....
val s = for(s <-gopherApi.select.once)
s match {
case i: channelA.read => s"Readed(${i})"
case x: channelB.write if (x==1) => s"Written(${x})"
}

}
```

and bind channel to actorsystem, by creating actor which will push all input
into channel:

bindChannelWrite[A: ClassTag](write: channels.OutputChannel[A],
name: String)
(implicit as: ActorSystem): ActorRef
## Unsugared interfaces

It's not worse to know that exists gopher API without macro-based syntax sugar.

(
new ForeverSelectorBuilder(gopherApi)
.reading(ch1){ x => something-x }
.writing(ch2,y){ y => something-y }
.idle(something idle).go
)

can be used instead appropriative macro-based call.

And for really tricky things exists even low-level interface, which can combine computations by adding to functional interfaces, simular to continuations:

{
val selector = new Selector[Unit](gopherApi)
selector.addReader(ch1, cont=>Some{gen=> something-x
Future successful cont
}
)
selector.addWriter(ch2, cont=>Some{(y,{something y;
Future successful cont
})})
selector.addIdle(cont => {..do-something-when-idle; Future successful cont})
}

Please, consult with source code for details.


Additional Informatiom
## Additional Informatiom
----------------------

* API reference: http://rssh.github.io/scala-gopher/api/index.html#package
* source code: https://github.com/rssh/scala-gopher

Some related links:

* [Communicating Sequential Processes book by Tony Hoare](http://www.usingcsp.com])
* [brief history of CSP in Bell-labs](http://swtch.com/~rsc/thread/)
* [introduction article about go defer/recover](http://blog.golang.org/defer-panic-and-recover)

4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scalacOptions ++= Seq("-unchecked","-deprecation" /* ,"-Ymacro-debug-lite" */ )

libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.11.2"

libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.3-SNAPSHOT"
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.2"

libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.0" % "test"

Expand All @@ -22,7 +22,7 @@ libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.6"

//testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-n", "Now")

version:="0.9.4-SNAPSHOT"
version:="0.99.0"


publishMavenStyle := true
Expand Down
Loading