<img style="float: left" src="images/spark.png" />
<img style="float: right" src="images/surfsara.png" />
<hr style="clear: both" />

# Introduction to Apache Spark

In this final notebook we will explore Spark using the Python API, called PySpark.

_You can edit the cells below and execute the code by selecting the cell and press Shift-Enter. Code completion is supported by use of the Tab key._

During the exercises you may want to refer to the [PySpark documentation](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html#pyspark.RDD) for more information on possible transformations and actions.

In [11]:
# initialize Spark
from pyspark import SparkContext, SparkConf
if not 'sc' in globals():
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)

## Creating a RDD from a list

All parallel work in Spark is done on RDDs, so the first thing we need to do is convert our data (in this case a list) to an RDD. We will use the `parallelize` method on the `SparkContext sc`. It takes two arguments: (1) the collection, and (2) the number of partitions (splits). The second argument is optional.

In [18]:
words_list = ['Dog', 'Cat', 'Rabbit', 'Hare', 'Deer', 'Gull', 'Woodpecker', 'Mole']
words_rdd = sc.parallelize(words_list, 2)

print 'the type of words_list is: ' + str(type(words_list))
print 'the type of words_rdd is: ' + str(type(words_rdd))

the type of words_list is: <type 'list'>
the type of words_rdd is: <class 'pyspark.rdd.RDD'>


## Map transformation

We now want to change all words in the `words_rdd` to their plural form. We will do this using a [map](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=map#pyspark.RDD.map) transformation.
Remember that the map function will apply the function to each element of the RDD. 

First, we will write a simple function that takes a single word as argument and return the word with an 's' added to it. In the next step we will use this function in a map transformation of the `words_rdd`.

Take a look at the function below and fill in the code at the tag &lt;FILL IN&gt;.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
def makePlural(word):
    """Adds an 's' to `word`.

    Note:
        This is a simple function that only adds an 's'.  

    Args:
        word (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return <FILL IN>

print makePlural('cat')

Next, we will use the `makePlural` function as input for the map transformation on `words_rdd`.
The action [`collect`](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=collect#pyspark.RDD.collect) transfers the content of the RDD to the driver. Note, that a large RDD may be scattered over many machines. In such a case a `collect` can be a very bad idea.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
plural_rdd = plural_rdd.map<FILL IN>
print plural_rdd.collect()

## Using lambda functions

But wait a minute! We can actually achieve the same functionality by using lambda functions. In this case we define makePlural as a lambda function. In this case we define `makePlural` as a lambda function. 

Hint: The map function needs a function as argument. This function needs one argument, let's call that `x`. The body of the function adds an 's' to the end of `x`.

In [None]:
# TODO: Replace <FILL IN> with appropriate code

# A lambda function for adding s at the end of a string
lambda_plural_rdd = words_rdd.map(<FILL IN>)

print lambda_plural_rdd.collect()

Let's do another map transformation. For each word in `words_rdd` determine its length. The Python function `len`  will return the length of a string.

You can do this with a lambda function, but there is another way. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
word_lengths = (<FILL IN>
                 .collect())
print word_lengths

Test your solution by running the following cell:

In [None]:
from test_helper import Test

Test.assertEquals(sorted(word_lengths), [3, 3, 4, 4, 4, 4, 6, 10], 'incorrect value for word_lengths')

 ## RDD from a text file
 
Manipulating a list with eight elements gets boring pretty fast, so let's move start processing a file. In this part we will read the file `alice.txt` from HDFS and will count the number of occurences of every word. This 'Word Count' is the ['Hello World'](https://en.wikipedia.org/wiki/%22Hello,_World!%22_program) of data-processing frameworks.

In [None]:
# authenticate for access to the HDFS
!kinit.sh

In [None]:
# create an RDD from alice.txt where every element is a line of the file
alice_rdd = sc.textFile('alice.txt').cache()

## `collect` call not accepted

As we mentioned before, once your data no longer fits on the screen the `collect` method becomes less useful or even problematic. But we still want to have a way to inspect the (intermediate) results. For this we can use one of the following methods as a replacement of `collect`:

- `first()`: return the first elements of the rdd 
- `take(n)`:  return a list of `n` elements
- `takeOrdered(n, [key=f])`: return the first n elements of the rdd, the order is defined by the optional function f.

In [None]:
print alice_rdd.first()

print 'first: ' + alice_rdd.first()
print 'take(5): ' + str(alice_rdd.take(5))
print 'takeOrdered(5, key=lambda x: -x)' + 
       str(alice_rdd.takeOrdered(5, key=lambda x: -x))

## The one-to-many map: flatMap

We have an RDD of lines, so let's try to convert this to an RDD of words by splitting the lines on whitespace:

In [None]:
alice_words_try = alice_rdd.map(lambda line: line.split())

print alice_words_try.first()

This doesn't look right! We want an RDD of words, but we have created an RDD of lists of words.

We want to map a function on an input that returns multiple values in a list, but then not to want the output nested in the same way as the input was. As it is commonly needed Spark includes a [`flatMap`](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=map#pyspark.RDD.flatMap) transformation that will _flatten_ the output of the function.

In [None]:
alice_words = alice_rdd.flatMap(lambda line: line.split())

print alice_words.take(5)

## Key-Value Pairs

In order to count words in parallel we are going to use an RDD which consist of simple key-value pairs. We will call this RDD `alice_pairs` and it will be result of a transformation of `alice_words`. For every word in `alice_words` we want to have a `(word, 1)` tuple.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
alice_pairs = <FILL IN>

print alice_pairs.takeOrdered(10)

In [None]:
Test.assertEquals(set(alice_pairs.takeOrdered(10),
                      {}
                 'incorrect value for alice_pairs')

## ReduceByKey

Next, we are going to count all words by using [`reduceByKey`](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=reducebykey#pyspark.RDD.reduceByKey).

`ReducebyKey` expects the RDD to consist of key-value pairs an it will perform a reduce operation per key. 
It will need a two-argument function as input that will work on the values only. Remember that a reduce function needs two arguments and will reduce all elements to a single value.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
# The function that is input to reduceByKey only works on the values. Spark will execute this function per key

word_counts = word_pairs.reduceByKey(lambda x,y : <FILL IN>)

top10_words = wordCounts.takeOrdered(10, key=lambda p: -p[1])

print top10_words

In [None]:
# TEST Counting using reduceByKey
Test.assertEquals(set(top10_words),
                  {},
                  'incorrect value for word_counts')

## ReduceByKey - background

The `reduceByKey` method is similar to Hadoop's Reduce, but more restrictive. The function you provide to `reduceByKey` needs to be both [commutative][https://en.wikipedia.org/wiki/Commutative_property] and [associative][https://en.wikipedia.org/wiki/Associative_Property].

These restrictions allow Spark to perform additional optimisations, performing the operation on each partitions of the RDD and minimising network traffic. In the example below at most six values need to be transmitted.

![reduceByKey](images/reducebykey.png)
(Picture by DataBricks)

## Tweets Analysis

As an example of how to analyse a file, we will look at a file with Dutch tweets. We will read in the file by making use of [sc.textFile](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext.textFile).

We will be using 4 partitions here. Take a look at the line where we filter and then map the data to utf-8 encoding. Note the way transformations are chained together.

Print out the first tweet in the RDD by making use of the [take](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=take#pyspark.RDD.take) action. It needs as argument the number of elements in the RDD that it will send to the driver.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Use take to print out the first tweet

tweetRDD = (sc.textFile('data/tweets.txt', 4)
            .filter(lambda x : len(x) > 0)
            .map(lambda x: x.encode('utf-8')))

print tweetRDD.<FILL IN>

##**Conversion to json**

Next, we are going to convert the tweets into JSON format. This will return a dictionary where each key is an attribute of the key. Some attributes, like *user* have sub-attributes.

In the next cell the conversion takes place and the first tweet is shown. 

Just execute the cell, there's noting to fill in.

In [None]:
import json
import re

jsonTweetRDD = tweetRDD.map(lambda x: json.loads(x))
parsed = jsonTweetRDD.take(1)
print json.dumps(parsed, indent=2)

##**Access to fields in the tweets**

We have made some selections for you to show how to access fields in tweets.

This is pure Python, although the data is contained in an RDD. You probably see what's going on here.

In [None]:
jsonTweetRDDtext = jsonTweetRDD.map(lambda x: [x['text'],
                                               x['created_at'],
                                               x['entities']['hashtags'],
                                               x['user']['name'], 
                                               x['user']['screen_name'], 
                                               x['user']['followers_count'], 
                                               x['user']['description']])
print jsonTweetRDDtext.take(1)

##**RDDs are distributed over workers**

It is important to understand what code is executed on workers, and what code on the driver. To move data to and from the driver to the workers is very expensive.

RDDs are distributed over workers and transformations define a sequence of RDDs. Never try to define an RDD inside an RDD and beware of what code is executed by the driver.

Let's make a quick list of all attributes in a tweet. We'll do it the wrong way first, by doing a map on the RDD.

In [None]:
print jsonTweetRDD.map(lambda x: x.keys()).take(1) 

##**Another attempt**

The previous code is very inefficient, since all tweets in the RDD are processed, and we end up with an RDD with all keys for all tweets. It would be better to take a single tweet and then outside an RDD compute the keys. Note that then the computation of the keys is done by the driver.

Try to do this in a single statement.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
print jsonTweetRDD.take<FILL IN>

##**Selecting Text**##

We select only the text from the tweets and clean it up a bit. Then we [cache](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=cache#pyspark.RDD.cache) the new RDD. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
tweetTextRDD = (jsonTweetRDD.map(lambda x: x['text'])
                            .map(lambda x: x.encode('utf-8'))
                            .map(lambda x: re.sub(r'[^\w]',' ', x))                   
                            .cache()
                            )
print tweetTextRDD.take(1)

##**Filtering**##

Use the [filter](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter) transformation to select only tweets that contain the word 'ik'.
Filter takes a boolean function as argument and returns those elements of the RDD which are true in respect to this function.

Make sure to convert the words in the tweets to lower case, before filtering.
Then count the words using [count](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=count#pyspark.RDD.count) and print the result

In [None]:
# TODO: Replace <FILL IN> with appropriate code
ikRDD = tweetTextRDD.<FILL IN>
count = <FILL IN>
print count

In [None]:
Test.assertEquals(count, 221, 'Wrong count')

## **Counting words in tweets**
Next count the words in tweets by applying four transformations in a chain on tweetTextRDD.

Note that tweetTextRDD is an RDD which contains lines (strings).

First, use string split on a single white space for every line in the RDD. Instead of Map, use [flatMap](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=flatmap#pyspark.RDD.flatMap) (Why?)<br>
Second, we only want to see words with a length larger than 1.<br> 
Third, use a map transformation to convert each word to lower case and create a (word, 1) tuple.<br>
Finally, use reduceByKey to add the result for each word.<br>

We will print the result by making use of the [takeOrdered](https://spark.apache.org/docs/1.5.2/api/python/pyspark.html?highlight=takeordered#pyspark.RDD.takeOrdered.) action.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Use flatmap, filter, map and reduceByKey, to split the lines, filter words smaller that 2 characters, 
# create (word, 1) tuples and add up the results
aRDD = (tweetTextRDD.<FILL IN>
        .<FILL IN>
        .<FILL IN>
        .<FILL IN>
print aRDD.takeOrdered(35, lambda x : -x[1])