# Case Study: MapReduce



For this case study we are going to look at the very popular concept of **MapReduce**.  This is a commonly used technique in the industry, especially when working with big data.  There are a lot of common tools out there that support this paradigm, a really popular one is **Spark**.  

The basic idea is easy to decipher from the name itself, it we pull it apart into _Map_ and _Reduce_ what we have are two main operations. 

1. **Map** which is to iterate over a list of values and return some value
2. **Reduce** apply an operation on all and return a smaller list of the results (combined, grouped, etc).

These functional paradigms exists in scala and **Map** would most closely relate to the `map` function in scala, and **Reduce** would be `fold`.  

![MapReduce - foldMap](images/foldmap.png)

Lets start by making sure we have everything loaded/imported that we need in this notebook. 

In [None]:
import $ivy.`org.typelevel::cats-core:2.0.0`

In [None]:
// Required to enable the higher kinded types in scala (support for F[_])
import scala.language.higherKinds

import scala.concurrent._
import scala.concurrent.duration._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import cats.{Monad, Monoid}
import cats.syntax.semigroup._ // for |+|
import cats.instances.int._ // for Monoid
import cats.instances.string._ // for Monoid
import cats.instances.future._ // for Applicative
import cats.instances.list._ // for Traverse
import cats.syntax.traverse._ // for sequence
import cats.instances.int._

To begin our discussion, lets make sure that we have a clear understanding of what the map function's signature looks like.  In a general sense, it is applying a function `A => B` to an `F[A]` which should return an `F[B]`.  

Below is an illustration of this signature (from the cats book).  

![Cats Image - MapReduce Functor and Fold](images/mapreduce-functor.png)

So any data type that has both the type classes, _functor_ and _foldable_, can be setup for map-reduce.  There is one caveat as well, when we are running the traversal, we will lose control over the order that items are applied, and so we will need to make sure that we are also **associative** (and, of course, identity): 

```scala
reduce(a1, reduce(a2, a3)) == reduce(reduce(a1, a2), a3)
```

## Implementing Foldmap

To start, lets implement foldmap.  The foldmap implementation, is basically a small map-reduce in and of itself, but run on one thread.  Lets first start by defining the signature to do the following. 

• accept a sequence of type `Vector[A]`
• accept a func on of type `A => B` , where there is a Monoid for `B`


In [None]:
// Signature

In [None]:
def foldMap[A, B: Monoid](values: Vector[A])(func: A => B): B = ???

Now, with our given signature, lets add the following to the body:

1. start with a sequence of items of type A
2. map over the list to produce a sequence of items of type B
3. use the Monoid to reduce the items to a single B

In [None]:
def foldMap[A, B : Monoid](as: Vector[A])(func: A => B): B = as.map(func).foldLeft(Monoid[B].empty)(_ |+| _)

Great, lets quickly try out our implementation. 

In [None]:
foldMap(Vector(1, 2, 3))(identity)

In [None]:
foldMap(Vector(1, 2, 3))(_.toString + "! ")

In [None]:
foldMap("Hello world!".toVector)(_.toString.toUpperCase)

## Distributing the operation

Our simple map reduce implementation above is great, but not necessarily the most efficient for a given machine.  Instead, we should attempt to utilize all of the processing power that we have available on the box which can only be accomplished if we start to divide up the data and distribute the operation to multiple cores/thread.  

![Distribute MapReduce](images/parallelising-foldmap.png)

We can accomplish the distributed work by using Scala's `Future` which work using thread pools.  To start dividing our work we want to try to utilize the number of available processors (instead of hard coding the value).  So lets first check and see how many available processors we have.  

In [None]:
val processorCount = Runtime.getRuntime.availableProcessors

Great, now that we have our processor count we can group our initial data request into buckets of the same count. 

In [None]:
(1 to 100).toList.grouped(processorCount).toList

## Parallel Foldmap

Lets start on our implementation of parallelfoldmap by defining the signature.  

In [None]:
def parallelFoldMap[A, B : Monoid](values: Vector[A])(func: A => B): Future[B] = ???

In [None]:
def parallelFoldMap[A, B: Monoid](values: Vector[A])(func: A => B): Future[B] = {
    val numCores = Runtime.getRuntime.availableProcessors
    val groupSize = (1.0 * values.size / numCores).ceil.toInt
    val groups: Iterator[Vector[A]] = values.grouped(groupSize)

    val futures: Iterator[Future[B]] = groups map { 
        group =>
            Future {
                group.foldLeft(Monoid[B].empty)(_ |+| func(_))
            }
        }

    Future.sequence(futures) map { iterable => iterable.foldLeft(Monoid[B].empty)(_ |+| _)
    }
}

val result: Future[Int] = parallelFoldMap((1 to 1000000).toVector)(identity)
Await.result(result, 1.second)

The book also provides an implementation that uses as much Cats as possible, to demonstrate how consice the solution can be.  

In [None]:
def parallelFoldMap[A, B: Monoid](values: Vector[A])(func: A => B): Future[B] = {
    val numCores = Runtime.getRuntime.availableProcessors
    val groupSize = (1.0 * values.size / numCores).ceil.toInt

    values.grouped(groupSize).toVector.traverse(group => Future(group.toVector.foldMap(func))).map(_.combineAll)
}

val future: Future[Int] = parallelFoldMap((1 to 1000).toVector)(_ * 1000)
Await.result(future, 1.second)