<a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-sa/4.0/80x15.png" /></a><div align="center">This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/">Creative Commons Attribution-ShareAlike 4.0 International License</a>.</div>

----

# Compute $\pi$ with Spark

This notebook shows how to compute an approximation to the constant "Pi" using a Map/Reduce job.  The original source can be found at https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py

The computation is based on a quasi-Monte Carlo method: we "sample" the area of the unit circle (which is exactly "Pi") by counting the number of randomly-generated coordinate pairs that fall within the unit circle.  A detailed explanation of the method can be found [here](http://mathfaculty.fullerton.edu/mathews/n2003/montecarlopimod.html).

---

## Python-only solution

As a starter, we can formulate a solution to the problem in pure Python using the built-in `map`, `filter`, `reduce` functions.  This leads to a straightforward generalisation for Spark

In [1]:
# number of iterations (constant)
N = 16*1000*1000

### 1. pick $N$ random points in the square $[0,+1] \times [0,+1]$


The following function picks a point in the unit square by generating the $x$ and $y$ coordinates uniformly at random:

In [2]:
# Python's uniform random number generator
from random import random

def point(_):  ## still need 1 argument for `map` below
    """A random point in the unit square."""
    x = random()
    y = random()
    return (x, y)

Now we can get an array of $N$ random points by just applying this function to any list of the correct size (note that `point` still takes a parameter, and ignores it!):

In [3]:
%%time

points = map(point, range(N))

CPU times: user 5.18 s, sys: 998 ms, total: 6.18 s
Wall time: 6.21 s


In [4]:
points[:5]

[(0.42855644154377803, 0.13429609611151172),
 (0.8917763049620622, 0.04446035994226749),
 (0.6524320657951928, 0.983059026998789),
 (0.41781473006532777, 0.8395036432321322),
 (0.8542528152501607, 0.3561265631343048)]

### 2. count the number $P$ of points that fall into the unit circle $\{ (x,y) | x^2+y^2 < 1 \}$

We can apply `filter` and keep only the points that lie withing the unit circle -- the length of the list so obtained will be $P$.

In [5]:
%%time

points_in_unit_circle = filter((lambda (x,y): (x**2 + y**2) < 1), points)    

CPU times: user 4.72 s, sys: 60.6 ms, total: 4.79 s
Wall time: 4.81 s


In [6]:
P = len(points_in_unit_circle)

### 3. for large enough $N$, the ratio $P/N$ approximates the area of a quarter of the unit circle, i.e. $\pi/4$

In [7]:
pi = 4.0 * P / N

print("Pi is roughly %f" % pi)

Pi is roughly 3.141209


---

## Spark translation

Since also Spark RRDs provide `map` and `filter` methods, we can straightforwardly translate the above Python solution.

A constant that is unique to Spark is the number of partitions, which controls the gramularity of parallelism (more partitions, smaller chunks of work sent to the executors).

In [8]:
partitions = 16

In order to process data in Spark, we have to inject it in the system using `SparkContext.parallelize()` and get an RDD back:

In [9]:
%%time

# inject an array of size N into Spark
dummy = sc.parallelize(range(N), partitions)

CPU times: user 863 ms, sys: 140 ms, total: 1 s
Wall time: 1.35 s


RDDs provide a `.map()` method that creates a new RDD by applying the given function elementwise:

In [10]:
%%time

# create collection of random points
points = dummy.map(point)

CPU times: user 326 ms, sys: 4.68 ms, total: 330 ms
Wall time: 333 ms


Likewise, the `.filter()` method creates a new RDD by copying only those values on which the supplied predicate is true:

In [11]:
%%time 

# keep only those in unit circle
points_in_unit_circle = points.filter(lambda (x,y): (x**2 + y**2) < 1)

CPU times: user 483 ms, sys: 68.5 ms, total: 552 ms
Wall time: 554 ms


Finally, the `.count()` method returns the number of elements in a RDD (like Python's built-in `len()`).  

Note that `.count()` is an **action** so it triggers actual computation of the DAG we have been building so far -- almost all the work that Spark does is concentrated in this single line!

In [12]:
%%time

# count the latter
P = points_in_unit_circle.count()

CPU times: user 10.5 ms, sys: 0 ns, total: 10.5 ms
Wall time: 7.86 s


In [13]:
pi = 4.0 * P / N

print("Pi is roughly %f" % pi)

Pi is roughly 3.141718


---

## Original Map/Reduce approach


This algorithm can be realized via a map/reduce scheme by using mappers that take no input and output 0 or 1 depending on whether a single randomly-generated point landed inside the circle or not.  Then the reducers simply add all the results.

This solution does not strictly depend on Spark-only features and could in principle run on any Map/Reduce engine.  On the other hand, all the worker is done on the mappers and the reduce step does not parallelize.

Python's standard [random()](https://docs.python.org/2/library/random.html#module-random) function generates a floating-point number uniformly in the semi-open range [0.0, 1.0).

In [14]:
from random import random

Normally, we would need to initialize a Spark context in order to perform initialization.  However, this is automatically done in this IPython installation, so we skip this part. (Only *one* SparkContext object can be alive at the same time.)

In [None]:
#sc = SparkContext(appName="PythonPi")

The two parameters `partitions` and `n` control the number of mappers and the number of repetitions that Spark is going to execute:

In [15]:
partitions = 16
n = 1000000 * partitions

The mapper function just generates a random point in the square [0, +1] x [0, +1] and then outputs 1 if it fell inside the unit circle and 0 if not:

In [16]:
def randomly_in_unit_circle(_):
    x = random()
    y = random()
    if (x**2 + y**2) < 1:
        return 1
    else: 
        return 0

The *reduce* function just adds all these 0's and 1's:

In [17]:
def add(a,b):
    return a+b

Now the total number of positive samples is found by applying map/reduce to a list of length `n`:

In [18]:
%%time

count = (sc.parallelize(range(1, n + 1), partitions)
         .map(randomly_in_unit_circle)
         .reduce(add))

CPU times: user 830 ms, sys: 137 ms, total: 967 ms
Wall time: 6.7 s


Now the area of the circle is computed as the number of positive samples divided by the area of the enclosing square (all samples land in the square); the factor `4` comes from the fact that we're sampling only the x>0, y>0 sector:

In [19]:
pi = 4.0 * count / n

print("Pi is roughly %f" % pi)

Pi is roughly 3.140334


Normally the SparkContext is stopped when the computations are done.  We don't do it here in order to be able to run more computations:

In [None]:
#sc.stop()