Scala library implementing the concept of functional pipes/conduits.
Scala
Switch branches/tags
Nothing to show
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
src
.gitignore
.travis.yml
COPYING
COPYING.LESSER
README.md
build.sbt

README.md

scala-conduit

Currently the library is experimental and the interface is subject to change.

Build Status

Overview

A Pipe is a component that receives objects of a given type and responds with objects of another type. It is similar to UNIX pipelines, except that instead of receiving and sending bytes it receives and sends specific objects. When a pipe finishes processing, it also produces a final result.

There are two main ways how to compose pipes:

  • Sequence them. When the first pipe finishes processing, the second pipe continues. It's similar to simple sequencing of commands in a UNIX pipeline.
  • Fuse them. The output of the first pipe (called upstream) is used as the input for the second one (called downstream). And if the first pipe finishes processing, the second pipe also receives its final result. This is similar to combining UNIX pipelines with |.

Unlike UNIX pipelines, when a pipeline is run, it is executed in a single thread and values are only generated on demand. A downstream pipe is only invoked when an upstream pipe requests input and is suspended again when it produces the requested value. So if an upstream pipe requests no input at all, the downstream pipe is never invoked.

This library aims at deterministic and correct handling of resources. Each pipe operation has a finalizer that is executed when the processing is interrupted for whatever reason.

The library is conceptually similar to Haskell's conduit library.

Requires Scala 2.10.

Scaladoc

Generated documentation is available here. It is updated only from time to time so it might not reflect the latest changes.

Core concepts

Note: To understand how type parameters of pipes work, you need to understand Scala's covariant and contravariant types.

Pipe

A general pipe is defined as

sealed trait GenPipe[-U,-I,+O,+R]

where:

  • U is the type of the value received from upstream when there is no more input available. This is the final result of the upstream pipe.
  • I is the type of values received from upstream.
  • O is the type of output values produced by this pipe and sent downstream.
  • R is the final result of the pipe when it finishes processing.

In majority of cases a pipe doesn't care about the result of its upstream pipe, so U is set to Any. This is so common that we define alias

type Pipe[-I,+O,+R]     = GenPipe[Any,I,O,R]

Sources and sinks

A source is a pipe that doesn't care about any possible input, so its input type is Any.. Usually a source never requests input from upstream.

A sink is a pipe that never produces output.

We also need to distinguish pipes that cannot receive any input. Their I parameter is set to Nothing. We'll use this kind of pipes to express how to continue processing when input is exhausted.

We define type aliases for these concepts as:

type Sink[-I,+R]        = Pipe[I,Nothing,R]
type Source[+O,+R]      = Pipe[Any,O,R]
type NoInput[-U,+O,+R]  = GenPipe[U,Nothing,O,R]

TODO

Examples

See Util.scala, IO.scala and NIO.scala for full code.

Read a file and output its contents as String lines.

First we create a source pipe that takes no input and lists a given file as its output:

def readLines(r: BufferedReader): Source[String,Unit] = {
  implicit val fin = closeFin(r);
  untilF[Any,Any,String](Option(r.readLine).map(respond[String] _));
}

Note the implicit finalizer fin. Using Scala's implicit feature it is applied to all pipe operations so that the reader is closed properly when the pipe is interrupted. Here closeFin is simply

implicit def closeFin(implicit c: Closeable): Finalizer =
  Finalizer { System.err.println("Closing " + c); c.close() }

Using readLines we can construct a pipe that takes BufferedReaders at its input and outputs their contents:

val readLines: Pipe[BufferedReader,String,Unit] =
  unfold[BufferedReader,String](readLines _);

Finally, we compose it with a simple mapping pipe that converts Files into BufferedReaders:

val readLinesFile: Pipe[File,String,Unit] = {
  import Finalizer.empty
  mapF((f: File) => new BufferedReader(
                      new InputStreamReader(
                        new FileInputStream(f), "UTF-8"))
    ) >-> readLines
}

Display the contents of all Scala files in the current directory

runPipe(
  listRec(new File(".")) >->
  filter[File](_.getName().endsWith(".scala")) >->
  readLinesFile >->
  log
)

where

def listRec(dir: File): Source[File,Unit] =

is a source pipe that doesn't take any input and outputs recursively a list of all files in a directory (similar to UNIX find),

def filter[A,R](p: A => Boolean): GenPipe[R,A,A,R] = ...

is a pipe that checks its input values and passes trough only those that satisfy a given predicate,

val readLines: Pipe[BufferedReader,String,Unit] = ...

readLinesFile is a pipe that for each File received on its input outputs all its lines as Strings,

def writeLines(w: Writer): Sink[String,Unit] = ...
val log = writeLines(new OutputStreamWriter(System.out));

log receives Strings and prints them to standard output.

Copyright

Copyright 2013, Petr Pudlák

Contact: petr.pudlak.name.

LGPLv3

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along with this program. If not, see http://www.gnu.org/licenses/.