# Parallel Scan Left
Having seen parallel map and parallel fold

map: apply function to each element

*  `List(1,3,8).map(x => x*x) == List(1, 9, 64)`

fold: combine elements with a given operation

* `List(1,3,8).fold(100)((s,x) => s + x) == 112`

we now examine parallel scanLeft:

scanLeft: list of the folds of all list prefixes

`List(1,3,8).scanLeft(100)((s,x) => s + x) == List(100, 101, 104, 112)`

`List(1,3,8).scanLeft(100)(_ + _) == List(100, 101, 104, 112)`

`List(a1, a2, a3).scanLeft(f)(a0) = List(b0, b1, b2, b3)`

where
* $b0 = a0$
* $b1 = f(b0, a1)$
* $b2 = f(b1, a2)$
* $b3 = f(b2, a3)$
We assume that `f` is assocative, throughout this segment.

`scanRight` is different from `scanLeft`, even if `f` is associative

`List(1,3,8).scanRight(100)(_ + _) == List(112, 111, 108, 100)`

We consider only `scanLeft`, but `scanRight` is dual.

## Sequential Definition

`List(a1, a2, ..., aN).scanLeft(f)(a0) = List(b0, b1, b2, ..., bN)`

where $b_0 = a_0$ and $b_i = f(b_{i−1}, a_i)$ for $1 ≤ i ≤ N$.
Give a sequential definition of `scanLeft`:

* take an array `inp`, an element `a0`, and binary operation `f`
* write the output to array `out`, assuming `out.length >= inp.length + 1`
```scala
def scanLeft[A](inp: Array[A], a0: A, f: (A,A) => A,out: Array[A]): Unit
```

In [1]:
def scanLeft[A](inp: Array[A],
a0: A, f: (A,A) => A,
out: Array[A]): Unit = {
out(0)= a0
var a= a0
var i= 0
while (i < inp.length) {
a= f(a,inp(i))
i= i + 1
out(i)= a
}
}

defined [32mfunction[39m [36mscanLeft[39m

Can `scanLeft` be made parallel? Assume that `f` is associative.

Goal: an algorithm that runs in $\mathcal{O}(\log n)$ given infinite parallelism.


At first, the task seems impossible; it seems that:
* the value of the last element in sequence depends on all previous ones
* need to wait on all previous partial results to be computed first
* such approach gives $\mathcal{O}(n)$ even with infinite parallelism


> Idea: give up on reusing all intermediate results
 * do more work (more `f` applications)
 * improve parallelism, more than compensate for recomputation
 
 
Can you define result of `scanLeft` using `map` and `reduce`?

Assume input is given in array `inp` and that you have `reduceSeg1` and `mapSeg` functions on array segments:
```scala
def reduceSeg1[A](inp: Array[A], left: Int, right: Int, a0: Int, f: (A,A) => A): A

def mapSeg[A,B](inp: Array[A], left: Int, right: Int, fi : (Int,A) => B, out: Array[B]): Unit
```

Assume that the input is given in the input array `inp`. And the boundaries of the segment that we are interested in are given by `left` and `right`. The initial element is `a0`, and the binary operation is `f`. 

Then reduce is going to be used in order to simply reduce that segment of the area `A`, and we can also use `map` which can apply
an operation on a given array segment and write the resulting output array. 

We're going to use a slightly modified variant of map where the function that determines the mapping is given
not only the element `A`, the value of the given point of the array, but also the index at which the value is stored. \

Can you implement scan left with an invocation of map and reduce? 

Here's one solution, this solution follows the definition of SCAD. 

So element in position i in the output, is the result of reducing the segment of the input array up to that position. Therefore, the resulting array will be obtained using a map over the input array where this function fi given to map is in fact going to reduce the array segment from 0 to i. The invocation of map will fill in
the output array With elements, starting from 0 and
ending in including input length- 1. We then just need to write the final element of the output array. That final element is computed by
taking the element before the final and combining it with the corresponding element of the input array. 



If map and reduce are implemented in parallel, and they each have $\log N$ parallel complexity, then because map is applying all these
individual operations in parallel, you can see that the overall depth
is going to continue to be $\log N$.

In the previous solution we do not reuse any computation.

Can we reuse some of it?

Recall that reduce proceeds by applying the operations in a tree

Idea: save the intermediate results of this parallel computation.

We first assume that input collection is also (another) tree.

Trees storing our input collection only have values in leaves:


In [2]:
sealed abstract class Tree[A]
case class Leaf[A](a: A) extends Tree[A]
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A]

defined [32mclass[39m [36mTree[39m
defined [32mclass[39m [36mLeaf[39m
defined [32mclass[39m [36mNode[39m

Trees storing intermediate values also have (res) values in nodes:

In [3]:
sealed abstract class TreeRes[A] { val res: A }
case class LeafRes[A](override val res: A) extends TreeRes[A]
case class NodeRes[A](l: TreeRes[A],
override val res: A, r: TreeRes[A]) extends TreeRes[A]

defined [32mclass[39m [36mTreeRes[39m
defined [32mclass[39m [36mLeafRes[39m
defined [32mclass[39m [36mNodeRes[39m

Can you define reduceRes function that transforms Tree into TreeRes?

Here's the signature of `reduceRes`. The implementation of
a `reduceRes` is very simple. Leaves map to leaves with the same value. And nodes invoke the reduction on left and
right subtree with the same operation. And then, we build the resulting node. The resulting node has these left and
right subtrees, this component. But it also needs to store the new value. In order to obtain the new value,
all we need to do is apply the given binary operation to
the results of the left and right subtree. So, you can see that the resulting tree has the same shape as the original tree, we just have these additional values. And the root of the overall tree is in fact the value of reduce on our initial collection.

## Reduce that preserves the computation tree

In [4]:
def reduceRes[A](t: Tree[A], f: (A,A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = (reduceRes(l, f), reduceRes(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}

defined [32mfunction[39m [36mreduceRes[39m

In [5]:
val t1 = Node(Node(Leaf(1), Leaf(3)), Node(Leaf(8), Leaf(50)))
val plus = (x:Int,y:Int) => x+y
val res0 = reduceRes(t1, plus)

[36mt1[39m: [32mNode[39m[[32mInt[39m] = [33mNode[39m([33mNode[39m([33mLeaf[39m([32m1[39m), [33mLeaf[39m([32m3[39m)), [33mNode[39m([33mLeaf[39m([32m8[39m), [33mLeaf[39m([32m50[39m)))
[36mplus[39m: ([32mInt[39m, [32mInt[39m) => [32mInt[39m = ammonite.$sess.cmd4$Helper$$Lambda$2052/0x9b467828@3d4a71
[36mres0[39m: [32mTreeRes[39m[[32mInt[39m] = [33mNodeRes[39m(
  [33mNodeRes[39m([33mLeafRes[39m([32m1[39m), [32m4[39m, [33mLeafRes[39m([32m3[39m)),
  [32m62[39m,
  [33mNodeRes[39m([33mLeafRes[39m([32m8[39m), [32m58[39m, [33mLeafRes[39m([32m50[39m))
)

Of course, we would like to do this computation in parallel, but all we need to do in order to accomplish that is to insert this `parallel` keyword in front of the two recursive invocations. The resulting function, we will call `upsweep`. This suggests the bottom up
computation that we use in order to obtain the tree of results.

In [6]:
import java.util.concurrent._
import scala.util.DynamicVariable
  
val forkJoinPool = new ForkJoinPool

  abstract class TaskScheduler {
    def schedule[T](body: => T): ForkJoinTask[T]
    def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
      val right = task {
        taskB
      }
      val left = taskA
      (left, right.join())
    }
  }

  class DefaultTaskScheduler extends TaskScheduler {
    def schedule[T](body: => T): ForkJoinTask[T] = {
      val t = new RecursiveTask[T] {
        def compute = body
      }
      Thread.currentThread match {
        case wt: ForkJoinWorkerThread =>
          t.fork()
        case _ =>
          forkJoinPool.execute(t)
      }
      t
    }
  }

  val scheduler =
    new DynamicVariable[TaskScheduler](new DefaultTaskScheduler)

  def task[T](body: => T): ForkJoinTask[T] = {
    scheduler.value.schedule(body)
  }

  def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
    scheduler.value.parallel(taskA, taskB)
  }

  def parallel[A, B, C, D](taskA: => A, taskB: => B, taskC: => C, taskD: => D): (A, B, C, D) = {
    val ta = task { taskA }
    val tb = task { taskB }
    val tc = task { taskC }
    val td = taskD
    (ta.join(), tb.join(), tc.join(), td)
  }

[32mimport [39m[36mjava.util.concurrent._
[39m
[32mimport [39m[36mscala.util.DynamicVariable
  
[39m
[36mforkJoinPool[39m: [32mForkJoinPool[39m = java.util.concurrent.ForkJoinPool@1d5af79[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
defined [32mclass[39m [36mTaskScheduler[39m
defined [32mclass[39m [36mDefaultTaskScheduler[39m
[36mscheduler[39m: [32mDynamicVariable[39m[[32mTaskScheduler[39m] = DynamicVariable(ammonite.$sess.cmd5$Helper$DefaultTaskScheduler@16d737a)
defined [32mfunction[39m [36mtask[39m
defined [32mfunction[39m [36mparallel[39m
defined [32mfunction[39m [36mparallel[39m

## Parallel reduce that preserves the computation tree (upsweep)

In [7]:
def upsweep[A](t: Tree[A], f: (A,A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = parallel(upsweep(l, f), upsweep(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}

defined [32mfunction[39m [36mupsweep[39m

Given this tree with results, we would now like to produce
the scanLeft of our initial collection. For the collection 1, 3, 8,
50 and the initial element 100, the scanLeft should be the following list. 

The computation of `scanLeft`, given
the result tree from `upSweep`, is called `downsweep`. Downsweep takes an initial element `a0`, which plays an important role here. And the tree of results. And the binary operation `f`. It will produce a new collection,just like the bigger tree in the end. We will first produce a tree that has the same length as the original one. To understand how downsweep works, the key fact to remember is that a0 is supposed to denote the reduce of all elements that come to the left of
the current tree, t, that we are given. 



At the very beginning,
this is the initial element, 100. As we move down the tree,
then we get some elements that proceeded. So, for example, for this subtree here. What we need to take into account is 100,
1, and 3. When we have the Leaf, then,
we simply need to apply operation f to the given element a0 and
the element in the Leaf. The interesting case is
of course case of Node. We are going to again recursively do
a downsweep on the left and right subtree. This will give us two new trees, left and
right, and then, we will combine it. The key question is, what is the initial element that we
are passing to these two subtrees? Now, what are the things that are to
the left of our left subtree here? Well, they are the things that
they are left to the entire tree. So, we are passing the same element a0. What about the right subtree? Here, we need to take into account both
the elements that we are given, a0. But also,
what happened in the left subtree.

## Using tree with results to create the final collection

In [8]:
def downsweep[A](t: TreeRes[A], a0: A, f : (A,A) => A): Tree[A] = t match {
case LeafRes(a) => Leaf(f(a0, a))
case NodeRes(l, _, r) => {
val (tL, tR) = parallel(downsweep[A](l, a0, f),
downsweep[A](r, f(a0, l.res), f))
Node(tL, tR) } }

defined [32mfunction[39m [36mdownsweep[39m

To understand how downsweep works, remember `a0` is used to denote the reduce of all elements that come to the left of current tree `t`

In [9]:
downsweep(res0, 100, plus)

[36mres8[39m: [32mTree[39m[[32mInt[39m] = [33mNode[39m([33mNode[39m([33mLeaf[39m([32m101[39m), [33mLeaf[39m([32m104[39m)), [33mNode[39m([33mLeaf[39m([32m112[39m), [33mLeaf[39m([32m162[39m)))

This is the reuslt of `scanLeft` using the functions defined above.

In [10]:
def scanLeft[A](t: Tree[A], a0: A, f: (A,A) => A): Tree[A] = {
val tRes = upsweep(t, f)
val scan1 = downsweep(tRes, a0, f)
prepend(a0, scan1)
}
def prepend[A](x: A, t: Tree[A]): Tree[A] = t match {
case Leaf(v) => Node(Leaf(x), Leaf(v))
case Node(l, r) => Node(prepend(x, l), r)
}

defined [32mfunction[39m [36mscanLeft[39m
defined [32mfunction[39m [36mprepend[39m

In [11]:
scanLeft(t1, 100, plus)

[36mres10[39m: [32mTree[39m[[32mInt[39m] = [33mNode[39m(
  [33mNode[39m([33mNode[39m([33mLeaf[39m([32m100[39m), [33mLeaf[39m([32m101[39m)), [33mLeaf[39m([32m104[39m)),
  [33mNode[39m([33mLeaf[39m([32m112[39m), [33mLeaf[39m([32m162[39m))
)

Previous definition on trees is good for understanding

As with `map` and `reduce`, to make it more efficient, we use trees that have arrays in leaves instead of individual elements.
Exercise: define `scanLeft` on trees with such large leaves, using sequential scan left in the leaves.

Next step: parallel scan when the entire collection is an array

▶ we will still need to construct the intermediate tree

Now we go further and examine parallel scan for a collection represented as an array. So we have one big array to start with. Interestingly even in this case we use tree to store intermediate results. The tree of intermediate results looks very similar to the one we seen before. 

In [12]:
sealed abstract class TreeResA[A] { val res: A }
case class Leaf[A](from: Int, to: Int,
override val res: A) extends TreeResA[A]
case class Node[A](l: TreeResA[A], 
override val res: A, r: TreeResA[A]) extends TreeResA[A]

defined [32mclass[39m [36mTreeResA[39m
defined [32mclass[39m [36mLeaf[39m
defined [32mclass[39m [36mNode[39m

`Node` stores left and right subtree, as well as the value we computed. On the other hand there is small change in leaves. We want to process our arrays efficiently, we want to stop when the chunks processing are small enough. We represent these chunks using indices `from` and `to`. We will not store the actual content, but we will store the indices to big array. We are storing indication where those values can be found. 

In [13]:
val threshold = 50
def upsweep[A](inp: Array[A], from: Int, to: Int,
f: (A,A) => A): TreeResA[A] = {
if (to - from < threshold)
Leaf(from, to, reduceSeg1(inp, from + 1, to, inp(from), f))
else {
val mid = from + (to - from)/2
val (tL,tR) = parallel(upsweep(inp, from, mid, f),
upsweep(inp, mid, to, f))
Node(tL, f(tL.res,tR.res), tR)
}
}

def reduceSeg1[A](inp: Array[A], left: Int, right: Int,
a0: A, f: (A,A) => A): A = {
var a= a0
var i= left
while (i < right) {
a= f(a, inp(i))
i= i+1
}
a
}

[36mthreshold[39m: [32mInt[39m = [32m50[39m
defined [32mfunction[39m [36mupsweep[39m
defined [32mfunction[39m [36mreduceSeg1[39m

In [14]:
def scanLeftSeg[A](inp: Array[A], left: Int, right: Int,
a0: A, f: (A,A) => A,
out: Array[A]) = {
if (left < right) {
var i= left
var a= a0
while (i < right) {
a= f(a,inp(i))
i= i+1
out(i)=a
}
}
}

defined [32mfunction[39m [36mscanLeftSeg[39m

In [15]:
def downsweep[A](inp: Array[A],
a0: A, f: (A,A) => A,
t: TreeResA[A],
out: Array[A]): Unit = t match {
case Leaf(from, to, res) =>
scanLeftSeg(inp, from, to, a0, f, out)
case Node(l, _, r) => {
val (_,_) = parallel(
downsweep(inp, a0, f, l, out),
downsweep(inp, f(a0,l.res), f, r, out))
}
}

defined [32mfunction[39m [36mdownsweep[39m

In [16]:
def scanLeft[A](inp: Array[A],
a0: A, f: (A,A) => A,
out: Array[A]) = {
val t = upsweep(inp, 0, inp.length, f)
downsweep(inp, a0, f, t, out) // fills out[1..inp.length]
out(0)= a0 // prepends a0
}

defined [32mfunction[39m [36mscanLeft[39m

In [18]:
val inp = Array(1,2,3)
val out = Array(1,1,1,1)
scanLeft(inp ,4,(x:Int, y:Int)=> x+y, out)

[36minp[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m1[39m, [32m2[39m, [32m3[39m)
[36mout[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m4[39m, [32m5[39m, [32m7[39m, [32m10[39m)

In [19]:
out

[36mres18[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m4[39m, [32m5[39m, [32m7[39m, [32m10[39m)