# First steps in PySpark 

In this notebook we will learn the fundamentals of functional programming, as well as the basic abstraction of a distributed object in Spark, the RDD. The notebook has been divided into two parts:

Part 1: map/reduce basics

Part 2: Work with RDD and Pair RDD abstractions 






# Part 1: map/reduce basics

![Hadoop Logo](https://upload.wikimedia.org/wikipedia/commons/thumb/0/0e/Hadoop_logo.svg/220px-Hadoop_logo.svg.png)
# **Apache Hadoop (MapReduce)**

It is an open source software framework written in Java for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures (of individual machines, or racks of machines) are common and thus should be automatically handled in software by the framework.

The core of Apache Hadoop consists of a storage part (Hadoop Distributed File System (HDFS)) and a processing part (MapReduce). Hadoop splits files into large blocks and distributes them amongst the nodes in the cluster. To process the data, Hadoop MapReduce transfers packaged code for nodes to process in parallel, based on the data each node needs to process. This approach takes advantage of data locality — nodes manipulating the data that they have on hand — to allow the data to be processed faster and more efficiently than it would be in a more conventional supercomputer architecture that relies on a parallel file system where computation and data are connected via high-speed networking.

![caption](http://d152j5tfobgaot.cloudfront.net/wp-content/uploads/2012/07/mapreduce.png)

Since data and computation are distributed, we should avoid the use of variables, i.e. mutable data. Thus, in contrast to impertaive programming, we shall use the functional approach (lambda calculus).

### The goal of the following excercises is to understand basic lambda calculus with python.

### (1a) Functional programming in Python

So, what is Functional Programming? From Wikipedia: 

« …a  programing paradigm that treats computation as the evaluation of mathematical functions and **avoids changing-state and mutable  data**.»

It´s based upon Lambda calculus, wich consist of:
 * Function definition (declaration of expressions)
 * Function application (evaluation of those expressions)
 * Recursion (iteration)

We have already used this in python!!! :)

Recall the typical "lambda x: x+1" we have been using as the first argument of map, reduce and filter methods:
 * **map** maps each value in the input collection to a different value. It´s just the classical mathematical funciton we are used to!
 * **reduce** takes two values from the input collection and returns a new value (of the same type) by appliying a commutative operation to them. 
 * **filter** filters the elements in the input collection according to a certain (boolean) criteria.
 

**Mapping**

![map](https://cosminpupaza.files.wordpress.com/2015/10/map.png?w=505)

In [1]:
a = range(1,10)
map(lambda x: x+1,a)

[2, 3, 4, 5, 6, 7, 8, 9, 10]

In [2]:
# square elements in a
map(lambda y: y**2,a)

[1, 4, 9, 16, 25, 36, 49, 64, 81]

**Filtering**
![filter](https://cosminpupaza.files.wordpress.com/2015/11/filter.png?w=405)

In [3]:
# filter elements equal 2
filter(lambda x: x==2,a)

[2]

In [5]:
# filter out elements multiple of 3
filter(lambda x: x%3==0,a)

[3, 6, 9]

**Reducing** Recall it must be commutative! Think about the importance of this when parallelizing computations

![](https://cosminpupaza.files.wordpress.com/2015/11/reduce.png?w=500)

In [6]:
# add all elements in the collection
reduce(lambda x,y: x+y,a)

45

In [7]:
# fin the maximum in the collection
reduce(lambda x,y: x if x>y else y,a)

9

In [8]:
# subtract all elements in the collection: this is not commutative
print reduce(lambda x,y: x-y,a)
print reduce(lambda x,y: y-x,a)

-43
5


In [9]:
# subtract all elements in the collection. We can give a default value to start with.
print reduce(lambda x,y: x-y,[1,2,3])
print reduce(lambda x,y: x-y,[1,2,3],0)
print reduce(lambda x,y: x-y,[1,2,3],6)

-4
-6
0


## (1b) Exercise: Calculate the mean of a collection of real numbers using map/reduce
Recall:

$$\bar x = \frac{\sum_{i=1}^{N} x_i}{N} $$

It´s starightforward to do this with python built-in methods sum() and len(). However, how would you do that with map/reduce? We have already shown how to usm the elements of an array. Thus, you have to calculate the length of the array. For this:
 * Create another array of the same size, consisting of 1s.
 * Sum the elements of that array

In [10]:
# mean calculation: we need the sum and the number of elements
a = range(1,10)

ocurrancies = map(lambda x: 1,a)
count = reduce(lambda x,y: x+y,ocurrancies)
mean = reduce(lambda x,y: x+y,a)/count

assert len(ocurrancies) == len(a)
assert count == len(a)
assert mean == sum(a)/len(a)

## (1c) Exercise: Calculate the standard deviation of a collection of real numbers
Recall:

$$\sigma_x^2 = \frac{\sum_{i=1}^{N} (x_i-\bar x)^2}{N-1} =
\frac{\sum_{i=1}^{N}\left(x_i^2+\bar x ^2-2x_i\bar x\right)}{N-1} = 
\frac{\sum_{i=1}^{N} x_i^2-N\bar x^2}{N-1}$$

For this, use the *mean* and *count* variables from the previous excercise.

In [11]:
# std calculation: apart from the mean and count, the sum of the squares is required.
squares = map(lambda x: x**2,a)
sum_squares = reduce(lambda x,y:x+y,squares)
std = pow((sum_squares-count*(mean**2))/float(count-1),0.5)

assert std == 7.5**0.5

# another way:
dev = map(lambda x: float((x-mean)**2),a)
std = reduce(lambda i,j: i+j,dev)/(count-1)

## (1c.bis) Exercise: all at once! 
For the std calculation, we have obtained separatedly the sum of elements, the lenght and the sum of the elements squared. That is, we have swept the array three times! Can you do it in a two step process using map/reduce? Do you think it might matter at some point?
 * Hint: recall that reduce takes two arguments of the same type, and returns another value of that type. So, instead of using numbers as the elements of our array, use tuples!!

In [12]:
a = range(1,10)

map_tuples = map(lambda j: (1,j,j**2),a)
reduce_tuples = reduce(lambda i,j:(i[0]+j[0],i[1]+j[1],i[2]+j[2]),map_tuples)

print reduce_tuples
count = reduce_tuples[0]
mean = reduce_tuples[1]/float(count)
sum_sq = reduce_tuples[2]

std = pow((sum_squares-count*(mean**2))/float(count-1),0.5)

assert std == 7.5**0.5

(9, 45, 285)


## (1d) Twe 'word-count' problem: creating histograms
Given a set of keys (e.g. words) in an input collection, calculate the frequency of each key (word). 

In order to understand better how map/reduce works, we will implement this simple calculation in several forms.

For simplicity, we are going to create a list of numbers between 1 and 9, that can be repated a (random) number of times.

In [13]:
# Create array of numbers with repeated elements
import random
a = range(1,10)
a = [[x]*random.randint(1,100) for x in a]
a = [x for y in a for x in y]
random.shuffle(a)
print a

[2, 2, 2, 7, 1, 6, 2, 1, 5, 5, 5, 4, 2, 8, 5, 9, 9, 2, 8, 1, 5, 7, 6, 9, 6, 2, 2, 2, 7, 5, 6, 5, 8, 4, 5, 8, 2, 6, 5, 6, 5, 5, 7, 6, 5, 4, 6, 9, 5, 6, 8, 8, 8, 8, 7, 8, 5, 5, 5, 6, 7, 7, 6, 2, 8, 9, 4, 3, 7, 7, 5, 5, 5, 5, 5, 5, 9, 5, 5, 2, 8, 5, 2, 2, 8, 8, 6, 2, 8, 2, 4, 8, 2, 6, 9, 2, 7, 8, 6, 7, 6, 5, 8, 4, 5, 7, 9, 4, 8, 5, 5, 7, 8, 6, 8, 9, 1, 9, 4, 6, 9, 5, 8, 6, 2, 2, 2, 5, 9, 6, 1, 2, 5, 9, 9, 9, 2, 9, 6, 5, 8, 2, 8, 6, 9, 8, 8, 1, 9, 6, 4, 2, 9, 2, 5, 2, 2, 2, 8, 8, 5, 6, 7, 5, 5, 3, 7, 8, 6, 3, 2, 8, 8, 7, 8, 5, 2, 4, 8, 7, 5, 5, 9, 5, 2, 5, 1, 8, 4, 5, 8, 8, 4, 8, 8, 7, 5, 6, 5, 7, 2, 7, 5, 2, 7, 5, 8, 9, 3, 9, 9, 6, 5, 6, 9, 8, 2, 5, 7, 6, 7, 6, 2, 7, 9, 2, 6, 8, 4, 1, 8, 8, 2, 5, 8, 8, 8, 6, 8, 5, 2, 8, 7, 2, 5, 6, 2, 8, 9, 8, 2, 9, 9, 6, 6, 5, 6, 2, 9, 1, 8, 2, 8, 8, 6, 9, 8, 6, 4, 8, 6, 2, 8, 9, 5, 6, 7, 2, 7, 2, 2, 9, 5, 9, 6, 5, 7, 6, 5, 5, 9, 6, 2, 5, 5, 5, 3, 7, 2, 5, 4, 6, 7, 5, 6, 1, 7, 7, 9, 2, 2, 2, 2, 2, 5, 4, 2, 6, 2, 9, 8, 6, 6, 2, 2, 6, 9, 9, 1, 5, 5, 7, 9, 

### (1d.1) Simple approach

 * Start with an empty dict
 * If a new key is not present in the dict, create it.
 * Otherwise, increase the frequency of the key by one.

In [14]:
def histSimple(lista):
    # create empty dictionary
    hist = {}
    
    for key in lista:
        if key in hist: hist[key] +=1 
        else: hist[key] = 1
    return hist  

histSimple(a)

{1: 15, 2: 73, 3: 6, 4: 18, 5: 78, 6: 60, 7: 37, 8: 72, 9: 48}

### (1d.2) Map/reduce

 * Recall that *reduce* applies an operation to 2 elements of the same type, and returns another element of that type. Thus, first thing to do is to map our collection to the type of the output. We cannot use dicts, as dict(list) removes duplictaed keys. We will use list of tuples instead.
 * Then, we have to define a mehtod in the reducer that combines keys. There are two steps:
   * Obtain the keys in the left list
   * Then, check that the key in the second list already exists in the first one

In [19]:
# Same histogram with map/reduce
def histMR(l,verbose=False):
    # emit an array of list of tuples
    m = map(lambda i: [(i,1)],a) 
    return reduce(lambda i,j: combineKeys(i,j,verbose), m)

def combineKeys(l1,l2,verbose=False):
    '''
    method to combine keys
    
    inputs: 
        * l1: list of tuples [(k1,v1),(k2,v2),...]        
        * l2: list of single tuple [(kk,vv)]
    
    output: list of tuples [(k1,f1),(k2,f2),...].
    '''
    
    # It is useful to print the process
    if verbose:
        print "reducing"
        print l1
        print l2
        
    # keys in left list
    keys1 = map(lambda (key,value): key,l1)
    # key in right list
    k2 = l2[0][0]
    if k2 in keys1:
        # get the index of the key=k2 in keys1
        index = keys1.index(k2)
        # increase the value of that key accordingly
        l1[index] = (k2,l1[index][1]+l2[0][1])
    else:
        # append the missing (key, value) pair to the left list
        l1 = l1+l2
    return l1

# if you want to see how works the reducer, call histMR with verbose=True, i.e. histMR(a,True)
print histMR(a)

assert histSimple(a) == dict(histMR(a))

[(2, 73), (7, 37), (1, 15), (6, 60), (5, 78), (4, 18), (8, 72), (9, 48), (3, 6)]


Note the difference with the previous method, based on dictionaries: now, keys are not sorted!!

But, where did we sorted the keys in *histSimple*??? Well, we didn´t, but python *dictionary* does that internally for us to speed up things. See the difference in time...

In [16]:
print histMR(a)
print dict(histMR(a))
%timeit histSimple(a)
%timeit histMR(a)

[(2, 73), (7, 37), (1, 15), (6, 60), (5, 78), (4, 18), (8, 72), (9, 48), (3, 6)]
{1: 15, 2: 73, 3: 6, 4: 18, 5: 78, 6: 60, 7: 37, 8: 72, 9: 48}
10000 loops, best of 3: 50.7 µs per loop
1000 loops, best of 3: 959 µs per loop


**(1d.3) Map/reduce with pre-sorting**  As shown, the sorting of keys used by a dictionary actually speed up the process. 

However, our *combineKeys* method is creating an array of keys and checking whether a new key is already present in every step. This can be avoid by sorting the initial list first.

In [22]:
# Same histogram with map/reduce and pre-sorting
def histMRwithSort(l):
    # sort the original list in ascending order
    m = map(lambda i: [(i,1)],sorted(a,reverse=False)) 
    return reduce(combineSortedKeys, m)

def combineSortedKeys(l1,l2):
    '''
    method to combine keys
    
    inputs: 
        * l1: list of tuples [(k1,v1),(k2,v2),...]        
        * l2: list of single tuple [(kk,vv)]
    
    output: list of tuples [(k1,f1),(k2,f2),...].
    '''
    
    # last key in left list
    k1 = l1[-1][0]
    # key in right list (there is only one!)
    k2 = l2[0][0]
    if k2 == k1:
        # update the value in the left list corresponding to the last key
        l1[-1] = (k2,l1[-1][1]+l2[0][1])
    else:
        # append the missing (key, value) pair to the left list
        l1 = l1+l2
    return l1

print histMRwithSort(a)
assert histSimple(a) == dict(histMRwithSort(a))

[(1, 15), (2, 73), (3, 6), (4, 18), (5, 78), (6, 60), (7, 37), (8, 72), (9, 48)]


Now, computing times get closer. Still, our map/reduce methods are slower, since we cannot use dictionaries...

In [23]:
print 'Calculating the histogram with the simple approach'
#print hist(a)
%timeit histSimple(a)

print 'Calculating the histogram with MR'
#print histMR(a)
%timeit histMR(a)

print 'Calculating the histogram with MR after sorting'
#print histMR2(a)
%timeit histMRwithSort(a)


Calculating the histogram with the simple approach
10000 loops, best of 3: 52.1 µs per loop
Calculating the histogram with MR
1000 loops, best of 3: 962 µs per loop
Calculating the histogram with MR after sorting
1000 loops, best of 3: 298 µs per loop


# Part 2: Spark. Work with RDD and Pair RDD abstractions 

![drawing](https://prateekvjoshi.files.wordpress.com/2015/10/1-main4.png)

# ** Apache Spark**

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop's two-stage disk-based MapReduce paradigm, Spark's multi-stage in-memory primitives provides performance up to 100 times faster for certain applications.

![](http://image.slidesharecdn.com/sparkandshark-120620130508-phpapp01/95/spark-and-shark-8-728.jpg?cb=1340197567)

By allowing user programs to load data into a cluster's memory and query it repeatedly, Spark is well-suited to machine learning algorithms.
![](http://spark.apache.org/images/logistic-regression.png)

Spark comes with a number of components that provide flexibility and generality.

<img src="http://spark.apache.org/images/spark-stack.png" alt="Drawing" style="width: 500px;"/>


## In this part, we keep on working on the word-count example, this time with spark. The basic abstraction of Spark is the Resilient Distributed Dataset (RDD):

#### «RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.»

 * Read only, partitioned collection of records (immutable).
 * Stores the transformations used to build a dataset (its linage), instead of the data itself. This property ensures fault-tolerance.
 * User can control partitioning and persistence (caching).
 * RDDs are statically typed.
 * … and yes, everything is written in scala ;p. So you better learn a little bit of it!
 
<img src="http://eng.trueaccord.com/wp-content/uploads/2014/10/scala-logo.png" alt="Drawing" style="width: 200px;"/>

#### We will be trying to understand this abstraction with simple examples, using the [Python API](http://spark.apache.org/docs/latest/api/python/index.html)




### ** (2a) Create a base RDD: parallelize, actions and transformations **
We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.

In [24]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList)
# Print out the type of wordsRDD
print type(wordsRDD)

<class 'pyspark.rdd.RDD'>


**Nothing has actually happened!**

`parallellize` tells spark to distribute the data, but this is not actually done until we perform some action.

Possible actions include couting, collecting, reducing, taking, etc. Take a look at http://spark.apache.org/docs/latest/programming-guide.html#actions


In [25]:
# look at what is going on in the shell. Have a look also at localhost:4040
wordsRDD.collect()

['cat', 'elephant', 'rat', 'rat', 'cat']

In [26]:
wordsRDD.take(2)

['cat', 'elephant']

Apart from actions, we can apply transformations to an RDD. Spark won´t do anything, until an action is performed.  

In [27]:
# Apply a lambda function to our RDD to get the plural of each word
pluralRDD = wordsRDD.map(lambda s: s+'s')
print pluralRDD

PythonRDD[3] at RDD at PythonRDD.scala:43


In [28]:
print pluralRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']


For instance, we can obtain the length of each word

In [29]:
pluralLengths = (pluralRDD
                 .map(lambda w: len(w))
                 .collect())
print pluralLengths
assert pluralLengths == [4,9,4,4,4]

[4, 9, 4, 4, 4]


### **(2b) Persisting and the RDD lineage**

So far, we have seen that Spark RDDs are *lazy evaluated*, i.e. nothing is actually done until an action is performed. In the RDD, the set of transformations to be applied are remembered: this is known as its *lineage*. It has the important consequence of making Spark RDDs *fault tolerant* automatically.

![](http://images.slideplayer.com/14/4499833/slides/slide_10.jpg) 

It might be interesting to store some intermediate results, though: perhaps because we want to apply several different transformations starting from that point, or because we are going to apply an iterative computation (as is customary in machine learning algorithms). For this, Spark has [several ways of persisting](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)

In [30]:
# note that the RDD is not cached until the first action is performed!! 
# Take a look at http://localhost:4040/storage/
wordsCached = wordsRDD.cache()
print wordsCached.is_cached
print wordsCached.getStorageLevel()

True
Memory Serialized 1x Replicated


In [31]:
print wordsCached.map(lambda s: s+'s').collect()
print wordsCached.map(lambda s: s+' is an animal').collect()

['cats', 'elephants', 'rats', 'rats', 'cats']
['cat is an animal', 'elephant is an animal', 'rat is an animal', 'rat is an animal', 'cat is an animal']


In [32]:
# default persisting
wordsCached.unpersist()
wordsCached.persist(StorageLevel.MEMORY_ONLY)
wordsCached.collect()
print wordsCached.getStorageLevel()

Memory Deserialized 1x Replicated


In [33]:
# play with other levels of storage, and see how it changes in http://localhost:4040/storage/
wordsCached.unpersist()
wordsCached.persist(StorageLevel.MEMORY_ONLY_SER)
wordsCached.collect()
print wordsCached.getStorageLevel()

Memory Serialized 1x Replicated


In [34]:
wordsCached.unpersist()

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

### **(2c) Partitioning **

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.

To get the number of partitions of an RDD, just use `getNumPartitions()` on your RDD. You can change the partitions during RDD creation (with `parallelize(collection,numPartitions)` or `fromTextFile(file,numPartitions)`), or afterwards with methos like `repartition(), coalesce()`, etc.

In [35]:
wordsRDD.getNumPartitions()

4

In [36]:
# change the number of partitions
wordRepartition = wordsRDD.repartition(5)
wordRepartition.getNumPartitions()

# in order to see the efects in the browser, we cache an apply an action
wordRepartition.cache().collect()

['rat', 'cat', 'elephant', 'cat', 'rat']

We can see the partitions using [glom()](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=glom#pyspark.RDD.glom): it returns an RDD created by coalescing all elements within each partition into a list.

In [37]:
print wordRepartition.glom().collect()
print wordsRDD.glom().collect()
print wordsRDD.coalesce(2).glom().collect()

[['rat'], ['cat', 'elephant', 'cat'], [], [], ['rat']]
[['cat'], ['elephant'], ['rat'], ['rat', 'cat']]
[['cat', 'elephant'], ['rat', 'rat', 'cat']]


Partitions are one of the most powerfull concepts in Spark: you can decide how to distribute your data so it can fit in memory, and more importantly, you can perform computations on each partition *before* speaking to other partitions. This can have an enorumous impact on performance

In [38]:
a = sc.parallelize(range(1,10**7),4).cache()
%timeit -n 3 -r 5 a.map(lambda x: x**2).sum()
a.unpersist()

3 loops, best of 5: 1.54 s per loop


ParallelCollectionRDD[15] at parallelize at PythonRDD.scala:423

In [39]:
a = sc.parallelize(xrange(1,10**7),4).cache()
%timeit -n 3 -r 5  a.mapPartitions(lambda iter: [sum(i**2 for i in iter)]).sum()
a.unpersist()

3 loops, best of 5: 951 ms per loop


PythonRDD[32] at RDD at PythonRDD.scala:43

In [40]:
a = sc.parallelize(xrange(1,10**7),8).cache()
%timeit -n 3 -r 5  a.mapPartitions(lambda iter: [sum(i**2 for i in iter)]).sum()
a.unpersist()

3 loops, best of 5: 952 ms per loop


PythonRDD[49] at RDD at PythonRDD.scala:43

### **(2c) Pair RDDs: *grouping* strategies in Spark**

The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, we will create a pair consisting of (`word`, 1) for each word element in the RDD, as we did in the map/reduce version of the histogram in Python, section (1d.2).

We can create the pair RDD using the map() transformation with a lambda() function to create a new RDD.

In [41]:
wordPairs = wordsRDD.map(lambda w: (w,1))
print wordPairs.collect()
assert wordPairs.collect() == [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


### ** (2c.1) `groupByKey()` approach **
An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:
  + The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
 
![](http://blog.cheyo.net/static/file/spark_groupByKey.jpg)

Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`. Next, sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.

In [42]:
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
# Print the key and the values corresponding to that word
for key, value in wordsGrouped.collect():
    print '{0}: {1}'.format(key,list(value))
    
assert ( sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()) ==
        [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])] )

rat: [1, 1]
elephant: [1]
cat: [1, 1]


In [43]:
wordCountsGrouped = wordsGrouped.map(lambda (k,iterator): (k,sum(iterator)))
# also we can mapValues directly:
wordCountsGrouped = wordsGrouped.mapValues(lambda iterator: sum(iterator))
print wordCountsGrouped.collect()

assert sorted(wordCountsGrouped.collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)]

[('rat', 2), ('elephant', 1), ('cat', 2)]


### ** (2c.2)  `reduceByKey` approach **
A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. 

The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

![](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/images/reduce_by.png)

In [44]:
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda a,b:a+b)
print wordCounts.collect()

# with add operator
from operator import add
wordCountsMod = wordPairs.reduceByKey(add)
print wordCountsMod.collect()

assert sorted(wordCounts.collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)]

[('rat', 2), ('elephant', 1), ('cat', 2)]
[('rat', 2), ('elephant', 1), ('cat', 2)]


In [45]:
# All together: create tuples of (word,1), and then apply the reduceByKey method
# to obtain the frequency of each word:
wordCountsCollected = (wordsRDD
                       .map(lambda x: (x,1))
                       .reduceByKey(add)
                       .collect())
print wordCountsCollected

[('rat', 2), ('elephant', 1), ('cat', 2)]


### ** (2c.3)  `combineByKey` approach: the mother of dragons **

The [combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=combinebykey#pyspark.RDD.combineByKey) method is a generic (and powerful!)function to combine the elements for each key using a custom set of aggregation functions.

It turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

Users provide three functions:

#### * createCombiner, which turns a V into a C (e.g., creates a one-element list)
#### * mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
#### * mergeCombiners, to combine two C’s into a single one.

In [46]:
# let's count the number of words:
wordPairs = wordsRDD.map(lambda i:(i,1))
wordPairs.combineByKey(
    # combiner: in this case we just return the integer
    lambda integ: integ,                   
    # merge value: how to add a new element (integer) to 
    # the combined value (integer again, in this simple case)
    lambda count,integ: count+integ,       
    # merge combiners: adding to combiners
    lambda count1,count2: count1+count2
).collectAsMap()

{'cat': 2, 'elephant': 1, 'rat': 2}

In [47]:
# let's return the count and the length of words:
wordPairs = wordsRDD.map(lambda i: (i,i))
wordPairs.combineByKey(
    # combiner: for each word, we create a tuple of (1,len(word))
    lambda w: (1,len(w)),                   
    # merge value: 
    lambda tup,w: (tup[0]+1,tup[1]),       
    # merge combiners: adding to combiners
    lambda tup1,tup2: (tup1[0]+tup2[0],tup1[1])
).collectAsMap()

{'cat': (2, 3), 'elephant': (1, 8), 'rat': (2, 3)}

In [48]:
# let's return the count and the list of words:
wordPairs = wordsRDD.map(lambda i:(i,i))
wordPairs.combineByKey(
    lambda w: (1,[w]),
    lambda tup,w: (tup[0]+1,tup[1]+[w]),
    lambda tup1,tup2: (tup1[0]+tup2[0],tup1[1]+tup2[1])
).collectAsMap()

{'cat': (2, ['cat', 'cat']),
 'elephant': (1, ['elephant']),
 'rat': (2, ['rat', 'rat'])}

## (2d) Apply word count to a file

### ** (2d.1) `wordCount` function **
First, define a function for word counting. This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts.

In [49]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda w: (w,1)).reduceByKey(add)
    
print wordCount(wordsRDD).collect()
assert sorted(wordCount(wordsRDD).collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)]

[('rat', 2), ('elephant', 1), ('cat', 2)]


### ** (2d.2) Capitalization and punctuation **
Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.
 
Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. Reading `help(re.sub)` might be useful.

In [50]:
import re
help(re.sub)

Help on function sub in module re:

sub(pattern, repl, string, count=0, flags=0)
    Return the string obtained by replacing the leftmost
    non-overlapping occurrences of the pattern in string by the
    replacement repl.  repl can be either a string or a callable;
    if a string, backslash escapes in it are processed.  If it is
    a callable, it's passed the match object and must return
    a replacement string to be used.



In [51]:
# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub('[^a-zA-Z0-9 ]','',text.lower().strip())

assert removePunctuation(" The Elephant's 4 cats. ") == 'the elephants 4 cats'
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(" The Elephant's 4 cats. ")

hi you
no underscore
the elephants 4 cats


### ** (2d.3) Load a text file **
For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print 15 lines.

In [52]:
# Just run this code
import os.path
baseDir = os.path.join('../data') # wherever you have put the file 'shakespeare.txt'
fileName = os.path.join(baseDir, 'shakespeare.txt')

shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(15))

0: 1609
1: 
2: the sonnets
3: 
4: by william shakespeare
5: 
6: 
7: 
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
11: but as the riper should by time decease
12: his tender heir might bear his memory
13: but thou contracted to thine own bright eyes
14: feedst thy lights flame with selfsubstantial fuel


### ** (2d.4) Words from lines **
Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + The first issue is that  that we need to split each line by its spaces.
  + The second issue is we need to filter out empty lines.
 
Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

In [53]:
# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
assert (shakespeareWordCount == 927631 or shakespeareWordCount == 928908)

[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
928908


### ** (2d.5) Remove empty elements **
The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [54]:
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda w: w !='')
shakeWordCount = shakeWordsRDD.count()
print shakeWordsRDD.take(4)
print shakeWordCount
assert shakeWordCount == 882996

[u'1609', u'the', u'sonnets', u'by']
882996


### ** (2d.6) Count the words **
We now have an RDD that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.

You'll notice that many of the words are common English words (know as stopwords).

Use the `wordCount()` function and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [55]:
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,key=lambda x: -x[1])
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))

the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678
