<a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-sa/4.0/80x15.png" /></a><div align="center">This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-sa/4.0/">Creative Commons Attribution-ShareAlike 4.0 International License</a>.</div>

----

Normally, we would need to initialize a Spark context in order to perform initialization.  However, this is automatically done in this IPython installation, so we skip this part. (Only *one* SparkContext object can be alive at the same time.)

In [1]:
#sc = SparkContext(appName="PythonPi")

Evaluate the following cell to confirm that the PySpark kernel has been started correctly and that a SparkContext is already available in variable `sc`:

In [2]:
sc

---

## Exercise 2.A

*How many lines are there in text file `hdfs:///shakespeare.txt.gz?`*

First, let us read the entire (compressed) text file into an RDD:

In [3]:
# read (compressed) text file
rdd = sc.textFile('hdfs:///shakespeare.txt.gz')

We can check that each element in the RDD is a single line of text:

In [4]:
rdd.take(5)

[u'The Project Gutenberg EBook of The Complete Works of William Shakespeare, by',
 u'William Shakespeare',
 u'',
 u'This eBook is for the use of anyone anywhere at no cost and with',
 u'almost no restrictions whatsoever.  You may copy it, give it away or']

So we can just print the count of elements in the RDD:

In [5]:
# each element in the RDD is a line; print their count
print(rdd.count())

124787


----

## Exercise 2.B

*Compute the product of the sequence of numbers `[1, 2, 3, 4, 5]` using PySpark.*

In [6]:
# turn the sequence into an RDD
rdd = sc.parallelize([1,2,3,4,5])

In [7]:
# define a function for multiplying two numbers
def mul(x,y):
    return (x*y)

In [8]:
# use the `mul` function to reduce the RDD to one single value
result = rdd.reduce(mul)

If your solution is correct, evaluating the following cell should print `120`:

In [9]:
print(result)

120


----

## Exercise 2.C

*A very simple technique for checksumming a stream of bytes is [computing the “bit parity”](https://en.wikipedia.org/wiki/Checksum#Parity_byte_or_parity_word) (i.e., number of “1” bits) of each bit position in a byte.*

*Compute the bit parity checksum of the above ASCII text file using PySpark.*

**Hint:** *you can compute parity of two bytes `a` and `b` by
combining them with Python’s `a^b` operator (XOR); it is
a commutative and associative operation. See [here](https://en.wikipedia.org/wiki/Checksum#Parity_byte_or_parity_word) for a more detailed description.*

First, let us read the entire text file into an RDD:

In [10]:
rdd = sc.textFile('hdfs:///shakespeare.txt.gz')

We want to reduce the entire RDD to a single value -- this calls for a folding operation, i.e., `reduce`, `fold`, or `aggregate`.

Now, the issue is that each element of the RDD is a string of characters (corresponding to a line of text), whereas we want to compute a byte-based XOR.  However, Python too has a built-in `reduce()` function.  Since bit parity is additive:

1. a function for computing the checksum of a single line
2. a function for updating the combined checksum of a group of lines, given an additional line (uses 1.)
3. a function to combine two checksums of different groups of lines.

So the right "folding" variant to use is `aggregate` with operations 2. and 3.

In [11]:
# 1. a function for computing the checksum of a single line

def checksum(line):
    "Compute XOR checksum of a line of text"
    return reduce(upd_char_checksum, line, 0)

def upd_char_checksum(val, ch):
    "Update checksum `val` by XOR-ing with ASCII code of character `ch`"
    return (val ^ ord(ch))

# example
checksum('bit parity')

88

In [12]:
# 2. a function for updating the checksum of a group of lines, given one additional line

def update_checksum(val, line):
    return (val ^ checksum(line))

# example
update_checksum(88, 'bit parity')

0

In [13]:
# 3. a function to combine two checksums

def combine_checksums(val1, val2):
    return (val1 ^ val2)

In [14]:
# finally, use PySpark's `aggregate()`

%time cksum = rdd.aggregate(0, update_checksum, combine_checksums)

print("The XOR checksum is: %d" % cksum)

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 2.8 s
The XOR checksum is: 10


----

## Exercise 2.D

*Implement a the “word count” algorithm using PySpark, and use it to count the words in file `hdfs:///shakespeare.txt.gz`*

The “word count” algorithm is comprised of the following steps:

1. Read lines of a text file into an RDD:

In [15]:
# 1. read lines of text file into RDD
rdd1 = sc.textFile('hdfs:///shakespeare.txt.gz')

2. Transform the RDD by splitting each line at blank spaces:

In [16]:
def words(line):
    "Split a line of text into words."
    return (line
           .lower()
           .split())
    
rdd2 = rdd1.flatMap(words)

In [17]:
rdd2.take(5)

[u'the', u'project', u'gutenberg', u'ebook', u'of']

3. Create a key/value RDD by pairing each word with the value 1:

In [18]:
def word_to_pair(word):
    "Map a single `word` into a *(word, 1)* pair"
    return (word, 1)

rdd3 = rdd2.map(word_to_pair)

In [19]:
rdd3.take(5)

[(u'the', 1), (u'project', 1), (u'gutenberg', 1), (u'ebook', 1), (u'of', 1)]

4. Sum the values associated with each word:

In [20]:
from operator import add

rdd4 = rdd3.reduceByKey(add)

In [21]:
rdd4.take(5)

[(u'fawn', 11),
 (u'considered,', 2),
 (u'considered.', 3),
 (u'mustachio', 1),
 (u'fleeces', 1)]

Now we can collect the result on the driver using `rdd4.collectAsMap()` or keep the RDD around for further processing.

*What are the 5 most frequent words?*

We can either sort `rdd4` and then take the first 5 elements, or use `takeOrdered()` which combines the two operation into one.  However, documentation of the latter method warns us that all sorting is done in the Spark driver memory, so it is only suitable for small datasets.

In [22]:
def nr_of_occurrences(pair):
    "Return nr. of occurrences from *(word, count)* pair."
    word, count = pair
    return count

rdd5 = rdd4.sortBy(nr_of_occurrences, ascending=False)

In [23]:
rdd5.take(5)

[(u'the', 27730),
 (u'and', 26099),
 (u'i', 19540),
 (u'to', 18762),
 (u'of', 18126)]

----

## Exercise 2.E

*Compute an approximation of the mathematical constant $\pi$ using the Monte Carlo method of estimating the size of the unit circle.  (See exercise 1.E for more details and a pure-Python solution.)*

Since also Spark RRDs provide `map` and `filter` methods, we can straightforwardly translate the Python solution from *Exercise 1.E*.

Fix the number of iterations; since the algorithm converges very slowly, we want this number to be large:

In [24]:
N = 100000000

A constant that is unique to Spark is the number of partitions, which controls the gramularity of parallelism (more partitions, smaller chunks of work sent to the executors).

In [25]:
partitions = sc.defaultParallelism*4

In order to process data in Spark, we have to inject it in the system using `SparkContext.parallelize()` and get an RDD back:

In [26]:
%%time

# inject an array of size N into Spark
dummy = sc.parallelize(xrange(N), partitions)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 3.33 ms


RDDs provide a `.map()` method that creates a new RDD by applying the given function elementwise:

In [27]:
%%time

import random

def point(_):
    "Return coordinates of a random point in the unit square."
    x = random.random()
    y = random.random()
    return (x,y)

# create collection of random points
points = dummy.map(point)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 37 µs


Likewise, the `.filter()` method creates a new RDD by copying only those values on which the supplied predicate is true:

In [28]:
%%time 

def in_unit_circle(coords):
    x, y = coords
    return (x**2 + y**2) < 1

# keep only those in unit circle
points_in_unit_circle = points.filter(in_unit_circle)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 57.9 µs


Finally, the `.count()` method returns the number of elements in a RDD (like Python's built-in `len()`).  

Note that `.count()` is an **action** so it triggers actual computation of the DAG we have been building so far -- almost all the work that Spark does is concentrated in this single line!

In [29]:
%%time

# count the latter
P = points_in_unit_circle.count()

CPU times: user 0 ns, sys: 12 ms, total: 12 ms
Wall time: 36.8 s


In [30]:
pi = 4.0 * P / N

print("Pi is roughly %f" % pi)

Pi is roughly 3.142004


### Alternative solution

This algorithm can be realized via a Map/Reduce scheme by using mappers that take no input and output 0 or 1 depending on whether a single randomly-generated point landed inside the circle or not.  Then the reducers simply add all the results.

This solution does not strictly depend on Spark-only features and could in principle run on any Map/Reduce engine.  On the other hand, all the worker is done on the mappers and the reduce step does not parallelize.

The two parameters `partitions` and `n` control the number of mappers and the number of repetitions that Spark is going to execute:

In [31]:
partitions = 16
n = 1000000 * partitions

The mapper function just generates a random point in the square [0, +1] x [0, +1] and then outputs 1 if it fell inside the unit circle and 0 if not:

In [32]:
import random

def randomly_in_unit_circle(_):
    x = random.random()
    y = random.random()
    if (x**2 + y**2) < 1:
        return 1
    else: 
        return 0

The *reduce* function just adds all these 0's and 1's:

In [33]:
def add(a,b):
    return a+b

Now the total number of positive samples is found by applying map/reduce to a list of length `n`:

In [34]:
%%time

count = (sc.parallelize(xrange(N), partitions)
         .map(randomly_in_unit_circle)
         .reduce(add))

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 31.1 s


Now the area of the circle is computed as the number of positive samples divided by the area of the enclosing square (all samples land in the square); the factor `4` comes from the fact that we're sampling only the x>0, y>0 sector:

In [35]:
pi = 4.0 * count / n

print("Pi is roughly %f" % pi)

Pi is roughly 19.639090


Normally the SparkContext is stopped when the computations are done.  We don't do it here in order to be able to run more computations:

In [36]:
#sc.stop()