Skip to content

Commit

Permalink
Exercises regarding standard implementation of Par type
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier de Silóniz Sandino committed Aug 30, 2016
1 parent c92a4b9 commit 99b4e53
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 3 deletions.
152 changes: 150 additions & 2 deletions src/main/scala/fpinscalalib/FunctionalParallelismSection.scala
Expand Up @@ -250,13 +250,161 @@ object FunctionalParallelismSection extends FlatSpec with Matchers with org.scal
* map2(pa, unit(()))((a,_) => f(a))
* }}}
*
* Let's try to implement `sortPar` via `map`:
* Let's try to implement `sortPar` via `map`. Remember that you can use the underscore notation to write anonymous
* functions:
**/
def parSortPar(res0: List[Int] => List[Int]): Unit = {
def parSortParAssert(res0: List[Int] => List[Int]): Unit = {
def sortPar(parList: Par[List[Int]]) = map(parList)(res0)

val executorService = Executors.newFixedThreadPool(2)
val parList = unit(List(1, 3, 2))
Par.run(executorService)(sortPar(parList)).get() shouldBe List(1, 2, 3)
}

/**
* What else can we implement using our API? Could we map over a list in parallel? Unlike `map2`, which combines two
* parallel computations, `parMap` (let’s call it) needs to combine N parallel computations. Let’s see how far we ca
* get implementing parMap in terms of existing combinators:
*
* {{{
* def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = {
* val fbs: List[Par[B]] = ps.map(asyncF(f))
* // ...
* }
* }}}
*
* Remember, `asyncF` converts an `A => B` to an `A => Par[B`] by forking a parallel computation to produce the
* result. So we can fork off our N parallel computations pretty easily, but we need some way of collecting their
* results. Are we stuck? Well, just from inspecting the types, we can see that we need some way of converting our
* `List[Par[B]]` to the `Par[List[B]]` required by the return type of `parMap`. Let's try to write the function
* `sequence` that will help us achieve that:
*
* {{{
* def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
* l.foldRight[Par[List[A]]](unit(List()))((h,t) => map2(h,t)(_ :: _))
* }}}
*
* Once we have `sequence`, we can complete our implementation of `parMap`:
*
* {{{
* def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
* val fbs: List[Par[B]] = ps.map(asyncF(f))
* sequence(fbs)
* }
* }}}
*
* Note that we’ve wrapped our implementation in a call to `fork`. With this implementation, `parMap` will return
* immediately, even for a huge input list. When we later call run, it will fork a single asynchronous computation
* which itself spawns N parallel computations, and then waits for these computations to finish,
* collecting their results into a list.
*
* We can also implement `parFilter`, a function which filters elements of a list in parallel:
*
* {{{
* def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
* val pars: List[Par[List[A]]] =
* l map asyncF((a: A) => if (f(a)) List(a) else List())
* map(sequence(pars))(_.flatten)
* }
* }}}
**/

def parFilterAssert(res0: List[Int]): Unit = {
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
val pars: List[Par[List[A]]] =
l map (asyncF((a: A) => if (f(a)) List(a) else List()))
map(sequence(pars))(_.flatten)
}

val filterOp = parFilter(List(1, 2, 3, 4, 5))(_ < 4)
val executorService = Executors.newCachedThreadPool()
val result = Par.run(executorService)(filterOp).get()
result shouldBe res0
}

/** = The algebra of an API =
*
* Like any design choice, choosing laws has consequences — it places constraints on what the operations can mean,
* determines what implementation choices are possible, and affects what other properties can be true. Let’s look at
* an example. We’ll just make up a possible law that seems reasonable. This might be used as a test case if we were
* writing tests for our library:
*/

def parLawMappingAssert(res0: Int) = {
val lawLeftSide = map(unit(1))(_ + 1)
val executorService = Executors.newFixedThreadPool(2)
Par.run(executorService)(lawLeftSide).get() shouldBe Par.run(executorService)(unit(res0)).get()
}

/**
* We’re saying that mapping over `unit(1)` with the `_ + 1` function is in some sense equivalent to `unit(2)`.
* For now, let’s say two `Par` objects are equivalent if for any valid `ExecutorService` argument, their `Future`
* results have the same value. We can check that this holds for a particular ExecutorService with a function like
* this:
*/

def parEqualAssert(res0: Boolean) = {
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean = p(e).get == p2(e).get

val lawLeftSide = map(unit(1))(_ + 1)
val executorService = Executors.newFixedThreadPool(2)
equal(executorService)(lawLeftSide, unit(2)) shouldBe res0
}

/**
* The preceding could be generalized this way:
*
* {{{
* map(unit(x))(f) == unit(f(x))
* }}}
*
* Here we’re saying this should hold for any choice of `x` and `f`, not just `1` and the `_ + 1` function. This
* places some constraints on our implementation. Our implementation of unit can’t, say, inspect the value it receives
* and decide to return a parallel computation with a result of `42` when the input is `1` — it can only pass along
* whatever it receives.
*
* Let’s see if we can simplify this law further. We said we wanted this law to hold for any choice of `x` and `f`.
* Something interesting happens if we substitute the identity function for `f`. We can simplify both sides of the
* equation and get a new law that’s considerably simpler:
*
* {{{
* map(unit(x))(f) == unit(f(x))
* map(unit(x))(id) == unit(id(x))
* map(unit(x))(id) == unit(x)
* map(y)(id) == y
* }}}
*
* Let’s consider a stronger property — that `fork` should not affect the result of a parallel computation:
*
* {{{
* fork(x) == x
* }}}
*
* Surprisingly, this simple property places strong constraints on our implementation of `fork`. After you’ve written
* down a law like this, take off your implementer hat, put on your debugger hat, and try to break your law. We’re
* expecting that `fork(x) == x` for all choices of `x`, and any choice of `ExecutorService`. We have a good sense of
* what `x` could be — it’s some expression making use of `fork`, `unit`, and `map2` (and other combinators derived
* from these). What about `ExecutorService`? There’s actually a rather subtle problem that will occur in most
* implementations of `fork`. When using an `ExecutorService` backed by a thread pool of bounded size (see
* `Executors.newFixedThreadPool`), it’s very easy to run into a deadlock.
*
* Can we fix `fork` to work on fixed-size thread pools? Let’s look at a different implementation:
*
* {{{
* def fork[A](fa: => Par[A]): Par[A] = es => fa(es)
* }}}
*
* This certainly avoids deadlock. The only problem is that we aren’t actually forking a separate logical thread to
* evaluate `fa`. So `fork(hugeComputation)(es)` for some `ExecutorService` `es`, would run `hugeComputation` in the
* main thread, which is exactly what we wanted to avoid by calling `fork`. This is still a useful combinator,
* though, since it lets us delay instantiation of a computation until it’s actually needed. Let’s give it a name,
* `delay`:
*
* {{{
* def delay[A](fa: => Par[A]): Par[A] = es => fa(es)
* }}}
*
* But we’d really like to be able to run arbitrary computations over fixed-size thread pools. In order to do that,
* we’ll need to pick a different representation of `Par`.
*/
}
14 changes: 13 additions & 1 deletion src/test/scala/fpinscalalib/FunctionalParalellismSpec.scala
Expand Up @@ -13,6 +13,18 @@ class FunctionalParalellismSpec extends Spec with Checkers {
}

def `par sortPar asserts` = {
FunctionalParallelismSection.parSortPar((l: List[Int]) => l.sorted)
FunctionalParallelismSection.parSortParAssert((l: List[Int]) => l.sorted)
}

def `par filter asserts` = {
FunctionalParallelismSection.parFilterAssert(List(1, 2, 3))
}

def `par law mapping asserts` = {
FunctionalParallelismSection.parLawMappingAssert(2)
}

def `par equal asserts` = {
FunctionalParallelismSection.parEqualAssert(true)
}
}

0 comments on commit 99b4e53

Please sign in to comment.