A concurrent programming library combining monadic and streaming I/O in Scala.
By releasing Molecule as open source NOKIA Bell Labs is supporting research in easing the industry's transition to network function virtualization on cloud computing platforms.
There is a mailing list for discussions.
- User-level threading model with low-overhead context switches on unmodified JVM's.
- Type-safe communication channels.
- High-performance and convenient stream processing primitives that batch data transparently.
- Incremental combinator parsers that work over non-blocking sockets (ala AttoParsec).
- Exceptions and graceful termination handling.
- Higher maintainability, reliability and flexibility compared to applications written against callback interfaces in plain Java.
Both the paper explaining the rationale and the design principles of Molecule, and the latest API documentation are available online.
Molecule: Using Monadic and Streaming I/O to Compose Process Networks on the JVM by Sébastien Bocq and Koen Daenen. ACM SIGPLAN conference on Object-Oriented Programming, Systems, Languages, and Applications (OOPSLA 2012), Tucson, Arizona, USA, 2012. [pdf]
ACM, 2012. This is the authors version of the work. It is posted here by permission of the ACM for your personal use. Not for redistribution. For referencing, please refer to the definitive version, published in SIGPLAN Not. 47, 10 (October 2012), 315-334. http://doi.acm.org/10.1145/2398857.2384640 .
Note that the API has been evolved since the publication. Check the scaladoc and example for the current version.
Note: many other examples are available for study in the molecule-*-examples
directories.
This example will walk you through the implementation of a simple process type. First, we show how instances of this process type can interact with the command line. Then, we show how to bind instances to Telnet sessions by implementing the minimal support for the Telnet protocol over binary streams using Molecule's NIO interfaces and incremental parser combinators.
Here is how a process that interacts on the command line is defined and then launched.
import molecule._
import molecule.io._
object HelloYou extends ProcessType1x1[String, String, Unit] {
def main(in: Input[String], out: Output[String]) = for {
_ <- out.write("What is your name?")
name <- in.read()
_ <- out.write("Hello " + name + "!")
} yield ()
import molecule.platform.Platform
import molecule.channel.Console
def main(args: Array[String]): Unit = {
// Create an execution platform
val platform = Platform("hello-you")
// Launch an instance of HelloYou on the platform
// and block the main thread with `get_!` until it is terminated.
platform.launch(HelloYou(Console.stdinLine, Console.stdoutLine)).get_!()
}
}
Before defining a new process type, we must import two packages. The first one imports molecule's main package. The second imports the abstract monadic process type with various useful monadic combinators defined as value members in the io
package object. Process types are patterned after function types in Scala:
abstract class ProcessTypeixj[I1, ... , Ii, O1, ..., Oj, R] {
final def apply(i1: IChan[I1], ..., oj: IChan[Oj]):Process[R] = ...
protected def main(i1:Input[I1], ..., oj:Output[Oj]):IO[R]
}
The abstract class is parameterized by the type Ii
and Oj
of the input and output channel interfaces of a process followed by its result type R
. The apply
method is used as a factory method to create lightweight processes. Since HelloYou
inherits form ProcessType1x1[String, String, Unit]
, the process type HelloYou
is a factory of process instances that use one input of type String
, one output of type String
and terminates with a result of type Unit
. All the processes it creates will share the same behavior, which is defined by the implementation of its main
method:
def main(in: Input[String], out: Output[String]) = for {
_ <- out.write("What is your name?")
name <- in.read()
_ <- out.write("Hello " + name + "!")
} yield ()
It prompts for a name on its output, reads the name on its input, says hello on its output and then returns ()
.
We can then create a HelloYou
process attached to the command line by "applying" its process type to the standard Console.stdinLine
and Console.stdoutLine
channels, which are defined in the channel package:
val platform = Platform("hello-you")
platform.launch(HelloYou(Console.stdinLine, Console.stdoutLine)).get_!()
The stdinLine
input channel, of type IChan[String]
, streams each lines typed on the standard input. The stdoutLine
output channel, of type OChan[String]
, does the reverse and prints each string it receives on consecutive lines on the standard output.
A Platform
creates the user-level threads that execute processes over a handful number of native threads. This number is configurable and matches by default the number of cores available on the underlying hardware (see Platform
factory methods). The launch method is declared like this:
abstract class Platform {
final def launch[R: Message](process: Process[R]): RIChan[R] = ...
}
The type RIChan
is the type of channels that deliver a single message, a bit like a Future
in java.util.concurrent
. Since the process instance is executed asynchronously, the native thread must block until the process has terminated, otherwise the application would exit immediately before someone has the time to type its name. This is done using the get_!
method of the result channel, which blocks the main thread until the process returns its result ()
.
We will now create a "Telnet servlet container" that instantiates a new process each time a Telnet client connects to it. For simplicity, we will just filter out initial Telnet negotiation commands - those that start with the IAC
byte followed by 1 byte identifying the operation and a second byte indicating the option. To do so, we create an incremental binary parser that we will use to parse Telnet messages from ByteBuffer
streams read on non-blocking TCP sockets:
import molecule.parsers.bytebuffer._
object TelnetLineAdapter {
val IAC = 255.toByte
abstract class TelnetMsg
case class Data(cb: ByteBuffer) extends TelnetMsg
case class Command(b1: Byte, b2: Byte) extends TelnetMsg {
override def toString() = "Command(" + unsigned(b1) + "," + unsigned(b2) + ")"
}
lazy val telnetMsg: Parser[ByteBuffer, TelnetMsg] = data | command
val data = splitAt(IAC) ^^ { Data(_) }
val command = (IAC ~ byteArray(2)) ^^ {
case _ ~ arr => Command(arr(0), arr(1))
}
}
Note: readers not familiar with parser combinators are invited to look at this excellent introduction by Daniel Spiewak.
In case of Telnet, the binary stream carries either some binary Data or a Telnet Command that starts with the IAC
byte. The splitAt
parser splits each ByteBuffer
that it receives at the position where the IAC
command occurs or fails if the first byte of the received ByteBuffer
matches IAC
. Using the telnetMsg
parser, we can now create a process type adapter that adapts process types that interact over string channels into process types that interact over raw byte buffer channels:
abstract class TelnetLineAdapter[R: Message](ptype: ProcessType1x1[String, String, R])
extends ProcessType1x1[ByteBuffer, ByteBuffer, R] {
import molecule.parsers.charbuffer
import java.nio.CharBuffer
def main(in: Input[ByteBuffer], out: Output[ByteBuffer]) =
handover {
ptype(
in.parse(telnetMsg).collect {
case Data(bb) => bb
}.map(decode("US-ASCII")).parse(charbuffer.line(2048)),
out.map(encode("US-ASCII")).map { s: String => CharBuffer.wrap(s.replaceAll("\n", "\r\n") + "\r\n") }
)
}
}
Telnet commands are filtered out from the byte buffer input stream using the collect
streaming primitive. The resulting stream is then converted into a stream of strings through ASCII decoded CharBuffer
s, which are then parsed into lines of maximum 2048 characters. Lines sent on the output are wrapped into character buffers and then encoded into ASCII byte buffers (output channels being contravariant, transformations must be read from right to left).
Now, we are ready to expose HelloYou
processes over individual Telnet connections using Molecule's NIO layer, like this:
import molecule.nio._
val HelloYouTelnet = new TelnetLineAdapter(HelloYou)
val ns = NetSystem(Platform("hello-you"))
ns.launchTcpServer("localhost", 8888, HelloYouTelnet)
The launchTcpServer
method of a NetSystem
, launches a new instance of the adapted HelloYou
process type each time it accepts a new TCP connection on the specified socket address. Each process will be connected to the byte buffer input and output streams of the socket connected to the client. This socket, configured in non-blocking mode, will be automatically closed once both channels are closed. This occurs as soon as the process terminates thanks to the automatic resource management implemented by monadic processes. The nice thing about this server is that it can handle efficiently more than one thousands Telnet sessions in one megabyte of memory without blocking any native thread. Also, now that we created this adapter, we can reuse it to expose any interactive process over Telnet as long as this process interacts line by line over string channels.
Note: Similar examples can be found in molecule-io-example
and molecule-net-examples
. See section "Running The Examples" for instructions on how to run these examples.
Molecule is available on the Sonatype OSS Maven repository (which is mirrored on the central Maven repository as well):
group id: com.github.molecule-labs
artifact ids (scala 2.9): molecule-core_2.9.3, molecule-io_2.9.3, molecule-parsers_2.9.3, molecule-net_2.9.3
artifact ids (scala 2.10): molecule-core_2.10, molecule-io_2.10, molecule-parsers_2.10, molecule-net_2.10
artifact ids (scala 2.11): molecule-core_2.11, molecule-io_2.11, molecule-parsers_2.11, molecule-net_2.11
latest versions:
0.5.3 (for molecule-core)
0.5.4 (for molecule-parsers)
0.5.2 (for other packages)
Alternatively you can download the Jar files directly from Sonatype:
for scala 2.11
for scala 2.10
for scala 2.9.3
Using sbt:
> git clone git://github.com/molecule-labs/molecule.git/
> cd molecule
> sbt collect-jar
From command line. First make a build as described above and stay in the molecule folder.
java -cp ~/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.9.3.jar:\
./target/2.9.3/molecule-core.jar:./target/2.9.3/molecule-io.jar:\
./target/2.9.3/molecule-net.jar:\
./target/2.9.3/molecule-core-examples.jar:\
./target/2.9.3/molecule-io-examples.jar:\
./target/2.9.3/molecule-net-examples.jar:\
./target/2.9.3/molecule-parsers.jar \
molecule.examples.io.HelloYou
Note: On Windows, use your full home directory iso '~' and use ';' iso ':' as file separator (and make it one long line, escaping CR with backslash will probably not work).
main example classes:
- molecule.examples.io.HelloYou (interactive in command line)
- molecule.examples.io.EchoYou (interactive in command line)
- molecule.examples.io.stopwatch.StopWatch (interactive, opens 3 graphical windows)
- molecule.examples.io.primesieve.PrimeSieve (logs primes)
- molecule.examples.io.chameneos.ChameneosRedux (logs a single number)
- molecule.examples.core.Clock (logs time)
- molecule.examples.net.echoyou.EchoYouTelnet
(This starts a telnet server that acts like the second example.
It will log the IP-address and port to connect to.
To connect, open another terminal window type
$ telnet <IP-address> <port>
You can connect multiple clients to the same server.
It should also work from another machine if your
Firewall does not block the connection over the given port.)
Alternatively, you checkout the sources and run them from your favorite IDE with the Scala plugin installed.
Copyright 2012-2018 NOKIA Bell Labs
Licensed under the Apache License, Version 2.0