Skip to content

Commit

Permalink
First pass of implementation and benchmarks
Browse files Browse the repository at this point in the history
Benchmark results:

[info] Result "termination.StreamBenchmark.zipAndAdd":
[info]   569.189 ±(99.9%) 5.915 ms/op [Average]
[info]   (min, avg, max) = (520.228, 569.189, 628.885), stdev = 25.043
[info]   CI (99.9%): [563.274, 575.104] (assumes normal distribution)

[info] Result "church.StreamBenchmark.zipAndAdd":
[info]   400.804 ±(99.9%) 5.202 ms/op [Average]
[info]   (min, avg, max) = (381.642, 400.804, 600.779), stdev = 22.024
[info]   CI (99.9%): [395.603, 406.006] (assumes normal distribution)
  • Loading branch information
Noel Welsh committed Mar 29, 2017
1 parent 17fa1bf commit d5e0918
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
@@ -0,0 +1,3 @@
# Church Encodings

Code demonstrating a reactive stream implementation with and without church encoded representation.
20 changes: 20 additions & 0 deletions benchmarks/src/main/scala/church/StreamBenchmark.scala
@@ -0,0 +1,20 @@
package church

import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit


class StreamBenchmark {
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def zipAndAdd(): Int = {
val input1 = Iterator.range(0, 10000000)
val input2 = Iterator.range(0, 10000000)

val ones1 = Stream.fromIterator(input1).map(_ => 1)
val ones2 = Stream.fromIterator(input2).map(_ => 1)
val sum = ones1.zip(ones2).map{ case (a, b) => a + b }
sum.foldLeft(0){ _ + _ }
}
}
20 changes: 20 additions & 0 deletions benchmarks/src/main/scala/termination/StreamBenchmark.scala
@@ -0,0 +1,20 @@
package termination

import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit


class StreamBenchmark {
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
def zipAndAdd(): Int = {
val input1 = Iterator.range(0, 10000000)
val input2 = Iterator.range(0, 10000000)

val ones1 = Stream.fromIterator(input1).map(_ => 1)
val ones2 = Stream.fromIterator(input2).map(_ => 1)
val sum = ones1.zip(ones2).map{ case (a, b) => a + b }
sum.foldLeft(0){ _ + _ }
}
}
13 changes: 13 additions & 0 deletions build.sbt
@@ -0,0 +1,13 @@
name := "church-and-state"

organization := "io.underscore"

version := "0.1-SNAPSHOT"

scalaVersion in ThisBuild := "2.12.1"

lazy val core = project

lazy val benchmarks = project.
dependsOn(core % "compile->test").
enablePlugins(JmhPlugin)
46 changes: 46 additions & 0 deletions core/src/main/scala/baseline/Stream.scala
@@ -0,0 +1,46 @@
package baseline

/**
* Baseline implementation of reactive streams
*/
sealed trait Stream[A] {
import Stream._

def zip[B](that: Stream[B]): Stream[(A,B)] =
Zip(this, that)

def map[B](f: A => B): Stream[B] =
Map(this, f)

def foldLeft[B](zero: B)(f: (A, B) => B): B = {
def next[A](stream: Stream[A]): A =
stream match {
case FromIterator(source) => source.next()
case Map(source, f) => f(next(source))
case Zip(left, right) => (next(left), next(right))
}

var result: B = zero

// Never terminates
while(true) {
result = f(next(this), result)
}
result
}
}
object Stream {
def fromIterator[A](source: Iterator[A]): Stream[A] =
FromIterator(source)

def always[A](element: A): Stream[A] =
FromIterator(Iterator.continually(element))

def apply[A](elements: A*): Stream[A] =
FromIterator(Iterator(elements: _*))

// Stream algebraic data type
final case class Zip[A,B](left: Stream[A], right: Stream[B]) extends Stream[(A,B)]
final case class Map[A,B](source: Stream[A], f: A => B) extends Stream[B]
final case class FromIterator[A](source: Iterator[A]) extends Stream[A]
}
69 changes: 69 additions & 0 deletions core/src/main/scala/church/Observable.scala
@@ -0,0 +1,69 @@
package church

sealed trait Observable[A] {
import Observable._

def next(receiver: Receiver[A]): Unit

def foldLeft[B](zero: B)(f: (A, B) => B): B = {
var result: B = zero
val receiver = new Receiver[A]()

next(receiver)
while(!receiver.isEmpty) {
result = f(receiver.get, result)
next(receiver)
}
result
}
}
object Observable {
def fromStream[A](stream: Stream[A]): Observable[A] =
stream match {
case Stream.Zip(l, r) => Zip(fromStream(l), fromStream(r))
case Stream.Map(s, f) => Map(fromStream(s), f)
case Stream.FromIterator(s) => FromIterator(s)
}

def fromIterator[A](source: Iterator[A]): Observable[A] =
FromIterator(source)

def always[A](element: A): Observable[A] =
FromIterator(Iterator.continually(element))

def apply[A](elements: A*): Observable[A] =
FromIterator(Iterator(elements: _*))

// Observable algebraic data type
final case class Zip[A,B](left: Observable[A], right: Observable[B]) extends Observable[(A,B)] {
private val leftReceiver = new Receiver[A]()
private val rightReceiver = new Receiver[B]()

def next(receiver: Receiver[(A,B)]): Unit = {
left.next(leftReceiver)
if(leftReceiver.isEmpty) {
receiver.none
} else {
right.next(rightReceiver)
if(rightReceiver.isEmpty) {
receiver.none
} else {
receiver.some((leftReceiver.get, rightReceiver.get))
}
}
}
}
final case class Map[A,B](source: Observable[A], f: A => B) extends Observable[B] {
private val upstream = new Receiver[A]()

def next(receiver: Receiver[B]): Unit = {
source.next(upstream)
if(upstream.isEmpty) receiver.none else receiver.some(f(upstream.get))
}

}
final case class FromIterator[A](source: Iterator[A]) extends Observable[A] {
def next(receiver: Receiver[A]): Unit =
if(source.hasNext) receiver.some(source.next) else receiver.none
}
}
15 changes: 15 additions & 0 deletions core/src/main/scala/church/Receiver.scala
@@ -0,0 +1,15 @@
package church

final class Receiver[A] {
var isEmpty: Boolean = true
var get: A = _

def some(a: A): Unit = {
isEmpty = false
get = a
}

def none: Unit = {
isEmpty = true
}
}
34 changes: 34 additions & 0 deletions core/src/main/scala/church/Stream.scala
@@ -0,0 +1,34 @@
package church

/**
* Implementation of reactive streams that takes the baseline implementation and adds a signal that indicates when a stream has terminated.
*/
sealed trait Stream[A] {
import Stream._

def zip[B](that: Stream[B]): Stream[(A,B)] =
Zip(this, that)

def map[B](f: A => B): Stream[B] =
Map(this, f)

def foldLeft[B](zero: B)(f: (A, B) => B): B = {
val observable = Observable.fromStream(this)
observable.foldLeft(zero)(f)
}
}
object Stream {
def fromIterator[A](source: Iterator[A]): Stream[A] =
FromIterator(source)

def always[A](element: A): Stream[A] =
FromIterator(Iterator.continually(element))

def apply[A](elements: A*): Stream[A] =
FromIterator(Iterator(elements: _*))

// Stream algebraic data type
final case class Zip[A,B](left: Stream[A], right: Stream[B]) extends Stream[(A,B)]
final case class Map[A,B](source: Stream[A], f: A => B) extends Stream[B]
final case class FromIterator[A](source: Iterator[A]) extends Stream[A]
}
57 changes: 57 additions & 0 deletions core/src/main/scala/termination/Stream.scala
@@ -0,0 +1,57 @@
package termination

/**
* Implementation of reactive streams that takes the baseline implementation and adds a signal that indicates when a stream has terminated.
*/
sealed trait Stream[A] {
import Stream._

def zip[B](that: Stream[B]): Stream[(A,B)] =
Zip(this, that)

def map[B](f: A => B): Stream[B] =
Map(this, f)

def foldLeft[B](zero: B)(f: (A, B) => B): B = {
// Use `Option` to indicate if the stream has terminated. `None` indicates no more values are available.
def next[A](stream: Stream[A]): Option[A] =
stream match {
case FromIterator(source) =>
if(source.hasNext) Some(source.next()) else None
case Map(source, f) =>
next(source).map(f)
case Zip(left, right) =>
for {
l <- next(left)
r <- next(right)
} yield (l, r)
}

var result: B = zero
var run: Boolean = true

while(run) {
next(this) match {
case None => run = false
case Some(a) =>
result = f(a, result)
}
}
result
}
}
object Stream {
def fromIterator[A](source: Iterator[A]): Stream[A] =
FromIterator(source)

def always[A](element: A): Stream[A] =
FromIterator(Iterator.continually(element))

def apply[A](elements: A*): Stream[A] =
FromIterator(Iterator(elements: _*))

// Stream algebraic data type
final case class Zip[A,B](left: Stream[A], right: Stream[B]) extends Stream[(A,B)]
final case class Map[A,B](source: Stream[A], f: A => B) extends Stream[B]
final case class FromIterator[A](source: Iterator[A]) extends Stream[A]
}
1 change: 1 addition & 0 deletions project/plugins.sbt
@@ -0,0 +1 @@
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.24")

0 comments on commit d5e0918

Please sign in to comment.