Constructing Dataflow Graphs with Function Composition Style
Arrow is a domain-specific language enabling users to create dataflow graphs easily in a declarative way.
- Type Checking
- Graph Constructing
- Graph Drawing
- Concurrent Runtime
- Record/Replay
- Distributed Execution Support
import arrow._
object Example {
def main(args: Array[String]) {
val graph = new ArrowGraph // create a new graph
import graph._ // import the `|>` operator
val func1 = (_: Int) + 1 // can directly use Scala function
val node2 = Node((_: Int) * 2) // can also create node explicitly
Stream(0, 1, 2) |> func1 |> node2 // draw dataflow
val output = run(node2) // output: Future[IndexedSeq[Int]]
// will return [2, 4, 6]
}
}
-
Create a producer node:
val producer: Node[Int, Int] = ...
or a function:
val producer: Int => Int = ...
-
Create a consumer node:
val consumer: Node[Int, Int] = ...
or a function:
val consumer: Int => Int = ...
-
Connect nodes and supply an input stream with the polymorphic
|>
operator:val flow = Stream(1, 2, 3) |> producer |> consumer
Consumer would then output a stream of
Int
s.Note that producer/consumer can either be
Node
s or functions. The input/output types must match between connections.|>
is associative:val flow = Stream(1, 2, 3) |> (producer |> consumer)
You can also use
<|
:val flow = consumer <| producer <| Stream(1, 2, 3)
-
Create a producer (a node or a function, here a function):
val producer: _ => Int = ...
_
means the type is unimportant here, similarly hereinafter. -
Create a bunch of consumers (
List
can also be any subtype ofTraversable
):val consumers: List[Int => _] = ...
-
Connect using just one line:
producer |> consumers
instead of:
for (consumer <- consumers) { producer |> consumer }
val producers: List[_ => Int] = ...
val consumer: Int => _ = ...
producers |> consumer
val producer: _ => List[Int] = ...
val consumers: List[Int => _] = ...
producer |> consumers
val producers: List[_ => Int] = ...
val consumer: List[Int] => _ = ...
producers |> consumer
import shapeless._ // for `HList`
val producer: _ => (Int :: Double :: HNil)
val consumers: (Int => _) :: (Double => _) :: HNil
producer |> consumers
import shapeless._
val producers: (_ => Int) :: (_ => Double) :: HNil
val consumer: (Int :: Double :: HNil) => _
producers |> consumer
import shapeless._
val producers: (_ => Int) :: (_ => Double) :: HNil
val consumers: (Int => _) :: (Double => _) :: HNil
produceres |> consumers
import shapeless._
val flow_lhs: _ => String = ...
val flow_rhs: String => Int = ...
val flow = flow_lhs |> flow_rhs // inputs `_`, outputs `Int`
val producers = flow :: xx :: HNil
val consumers: (Int => _) :: HNil = ...
producers |> consumers