<a href="https://colab.research.google.com/github/laiafc/ManagementOmics/blob/master/spark_exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### i0u19a - Data Processing - KU Leuven

# Python Spark exercises

###### _Thomas Moerman, Jan Aerts_

![license](https://licensebuttons.net/l/by/3.0/88x31.png)

Hello and welcome to the tutorial on data processing with Spark!

We'll be using Jupyter notebook (you're looking at it) as a tool to walk you through a few examples. At the VDA-LAB, we like notebooks as a teaching tool because they allow you to experiment with code and data as you work your way through the document.

A few guidelines on the notebook itself:
* A notebook consists of *cells*, which are snippets of either text (markdown) or code (Python in this case).
* Cells can be executed by clicking the `[>]` "play" button, or by hitting shift-enter on the keyboard.
* You can navigate between cells either by clicking or by using the arrow buttons.



In [0]:
!pip install pyspark

In [0]:
!wget http://vda-lab.be/assets/beers.csv

In [0]:
!head beers.csv

## Getting Sparky

The `SparkContext` object is the main entrypoint for every Spark program. 

We instantiate a `SparkContext` object and store it in variable 'sc'. We only need to do this once.

In [0]:
import pyspark

# prevents ValueError from occurring when reloading this cell
def init_sc(prev_sc):  
    try:        
        return pyspark.SparkContext('local[*]')
    except ValueError:
        return prev_sc        

sc = init_sc(locals().get('sc'))

We need the `SparkContext` in order to create RDDs: **Resilient Distributed Datasets.**

RDDs are the main building block in Spark programs. They represent distributed computations, partitioned over a cluster of machines. But we don't worry too much about that, let's for now do something trivial to verify whether it works.

We turn a range of 1000 integers in an RDD with the `parallelize(data)` method. 

In [0]:
rdd = sc.parallelize(range(1000))

As you can see, Spark doesn't compute any results right away. Think of it as storing the "recipe" for the computation in a variable.

To actually launch a computation, we can perform a `collect` or a `take` operation on the rdd. Let's try taking the top 3 elements from the rdd.

In [0]:
rdd.take(3)

That was easy!

Now, why is this approach so convenient? Think about it for few moments.

Imagine you wrote a complicated computation on a large dataset. This could take a while to compute. When writing and testing a computation, we can go along while only inspecting a small subset of the result by performing for example a `take(5)` operation. This way we can avoid always having to wait for the entire computation to complete.

### Mapping

Our boss decided that our sequence should not start at 0 but at 10. Of course we could re-initialize the rdd with the correct range, but let's for the sake of the exercise transform the `rdd` data set.

We need a function that allows us to increment each number in the sequence with 10.

That function is the **`map` function**, which **takes a function and applies it to every single element of a collection/array**. For example: 'multiply all elements by 2'. This means that the length of the output array will be the same as the length of the input array.

Let's first define that function.

In [0]:
def inc10(i):
    return i + 10

We pass the `inc10` function as argument to the `map` method on our rdd.

In [0]:
rdd_inc10 = rdd.map(inc10)

rdd_inc10.take(3)

Excellent. 

Now, instead of passing a named function, we can also pass an anonymous function function to the `map` method. Python uses lambda expressions for inline anonymous functions.

In [0]:
rdd_inc10_anon = rdd.map(lambda i: i + 10)

rdd_inc10_anon.take(3)

So that works as well. Often it is more convenient to use anonymous functions, but it all depends on the preference of the programmer of course, there's no right or wrong in this matter.

### Filtering

Let's now make our range of numbers a bit more interesting. We are now only interested in even numbers. In order to express this, we need to `filter` our rdd.

In [0]:
def is_even(x):
    return x % 2 == 0

rdd_even = rdd_inc10.filter(is_even)

The `filter` method takes a function or lambda expression that takes an integer `i` and produces a `boolean` value that decides whether or not to keep the value `i` in the resulting rdd.

Let's test by taking a random sample of 3 items from the rdd.

In [0]:
even_3 = rdd_even.takeSample(False, 3)

for i in even_3:
    assert is_even(i)
    
even_3

Only even numbers, right? Good.

### Reducing

Let's now perform an aggregation on the data. The workhorse function for simple aggregations is the `reduce(f)` function. Look up the `reduce` function in the [pyspark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) page. It says:

**`reduce(f)`**

> Reduces the elements of this RDD using the specified _commutative and associative binary operator_.

A binary operator is a function that takes 2 inputs and produces one output. IMPORTANT: inputs and outputs have the **same type**. Can you come up with some examples of binary operators?

Straightforward examples are mathematical operators like addition (`+`) and multiplication (`*`). Note that these operators are not restricted to numbers, we can also define commutative and associative binary operators on other data types like hashmaps, as we'll see in a later example.

Let's create a simple RDD and try it out:

In [0]:
rdd_tiny = sc.parallelize([1, 2, 3, 4, 5])

rdd_tiny.reduce(lambda a, b: a + b)

The reduction goes as follows: 

* First, we take the first and second element and apply the function `f` to these inputs (remember, `f` is the mathematical addition):

    `f(1, 2) => 3`
    
    
* Next, we take the result `3` and use it in the next calculation:

    `f(3, 3) => 6`
    
    
* And so on until we have consumed all values in the RDD:

```
f(6, 4) => 10
f(10, 5) => 15
```
    
In other words: a reduction is essentially walking through a collection and aggregating as we go along. We can do the same thing with a multiplication instead of an addition.

In [0]:
rdd_tiny = sc.parallelize([1, 2, 3, 4, 5])

rdd_tiny.reduce(lambda a, b: a * b)

Ok, that should make sense now. As an exercise, complete following reduction on our RDD of even numbers.

In [0]:
sum_even = rdd_even.reduce(FIX_ME) # Complete this

...and check the result (`assert` complains when the answer is wrong).

In [0]:
assert sum_even == 254500

Now let's do something a bit more difficult. We want to calculate the average of the even numbers, using a single reduction. Instead of working with integers, we will work with the [tuple](http://www.tutorialspoint.com/python/python_tuples.htm) data type, representing pairs of (total, count).

So the `sum_pairs` function should have the following shape:

```
f(pair, pair) => pair
```
    
It takes two pairs as input and produce a pair as output. 

Complete the `sum_pairs` function.

In [0]:
def sum_pairs(pair1, pair2):
    #complete this
    return FIX_ME

total1, count1 = rdd_even.map(lambda x: (x, 1)).reduce(sum_pairs)

avg1 = total1 / count1

Check the result.

In [0]:
assert avg1 == 509

The previous exercise is perhaps the most typical example of a **Map/Reduce** application.

We **map** the inputs into a convenient shape, and **reduce** these shapes into the final result.
   
So to recapitulate:,
    
* **`map`** performs a function on every single element of a collection, e.g. multiply every item by 2 => the number of elements in the collection does not change,
* **`filter`** throws all elements away that do not comply to some rules => the number of elements in the collection is smaller or the same as in the original collection. Different elements are **not** combined.
* **`reduce`** combines the different elements of the collection in some way (<-> `filter`)


### Aggregating

Spark provides several aggregation methods that provide the scaffolding for performing Map/Reduce operations. We now explore on of these methods, called `aggregate`. Let's check out the [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) again. 

**`aggregate(zeroValue, seqOp, comboOp)`**

Arguably, `aggregate` is the most useful function in the Spark toolbox. It's important to understand how it works.

Remember that the purpose of Spark is to perform distributed calculations. 

In a distributed calculation, the data is first partitioned across different processes on different compute nodes. Next, in each partition, the `zeroValue` and `seqOp` work together to reduce the data to a (partial) aggregation. So in our example that computes the average, we calculate the sum and count per partition with these two functions.

* `zeroValue` represents an empty accumulator `acc`.
* `seqOp` is a function of shape `(acc, entry) -> acc`, it takes an accumulator (e.g. a `(sum, count)` pair) and produces an updated accumulator.

We're not done yet. Next we need to combine the partial results into a final result. That's where the `comboOp` function comes in. 

* `comboOp` is equivalent to the `f` function we pass to a `reduce(f)`, like illustrated above. It also has the shape `(acc, acc) -> acc`.



```

                    zeroValue + seqOp
                  +-------------+
          ,-----> | partition 1 | ----.
         /        +-------------+      \
        /                               \        comboOp                       
+------+          +-------------+        `---> +-------------------------+
| Data | -------> | partition 2 | -----------> | [acc 1] [acc 2] [acc 3] | ---> final result
+------+          +-------------+        ,---> +-------------------------+
        \                               / 
         \        +-------------+      / 
          `-----> | partition 3 | ----'
                  +-------------+


```


The example in the `aggregate` documentation should make sense now. Although the answer is already given, let's do it one more time to train our Spark muscle memory. Show me [wax on, wax off](https://youtu.be/2ynryUjGFt8?t=176)!

In [0]:
zero_value = (0, 0)

def seqOp(acc, entry):
    return [acc[0]+entry, acc[1]+1]

total2, count2 = rdd_even.aggregate(zero_value, seqOp, sum_pairs)

avg2 = total2 / count2

The resulting average should be the same.

In [0]:
assert avg2 == 509

Muy bien.

### Aggregating by key

A very common principle in data processing is *aggregation by key*. Before we perform an aggregation by key, we need to introduce another Spark concept, the `PairRDD`. A `PairRDD` is an RDD where each item consists of a pair, or tuple of size 2. In such a tuple, the first element is considered the key, the second element the value.

A handy function to build a `PairRDD` from a normal RDD is `keyBy(f)`. It takes a function `f` that takes an entry and produces some kind of value for that input entry. For example, key this list of names by their length:

In [0]:
names_rdd = sc.parallelize(["Dirac", "Feynman", "Ramanujan"])

names_by_length = names_rdd.keyBy(lambda name: len(name))
                            
names_by_length.collect()

Try it yourself in this simple example. We use the original rdd and transform it into a new shape, where each integer is keyed by a boolean that says whether it is even or not. We can reuse the `is_even(x)` function we defined before.

In [0]:
rdd_by_key = rdd.keyBy(FIX_ME) # Complete this

assert rdd_by_key.take(3) == [(True, 0), (False, 1), (True, 2)]

Let's now perform a reduction by key that returns the sum of even and odd integers.

In [0]:
sum_by_key = rdd_by_key.reduceByKey(FIX_ME).collect() # Complete this

assert sum_by_key == [(False, 250000), (True, 249500)]

The last aggregation method we will look at for now is the aggregateByKey method. Again, consult the [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD).

**`aggregateByKey(zeroValue, seqFunc, combFunc)`**

Use this to calculate the averages by key instead of the sum by key. We break up the calculation in two steps: first we calculate the sum and count by key, next we calculate the average by key, by using the `mapValues` method. 

`mapValues` transforms only the "value" elements (right) in a `PairRDD`, the keys (left) remain intact.

In [0]:
sum_count_by_key = rdd_by_key.aggregateByKey(FIX_ME) # Complete this

assert sum_count_by_key.collect() == [(False, (250000, 500)), (True, (249500, 500))]

In [0]:
def sum_count_to_avg(tuple):
    # Complete tis
    return FIX_ME

avg_by_key = sum_count_by_key.mapValues(sum_count_to_avg).collect() # Complete this

assert avg_by_key == [(False, 500.0), (True, 499.0)]

Excellent.

We now have some useful methods in our Spark toolbox. We know how to `map`, `filter`, `reduce` and `aggregate`, and we know how to aggregate by key.

That made me thirsty, are you thirsty? 

Let's move on to the beer dataset :-)

## Dos cervezas, por favor!

We start with reading in the `beers.csv` data set. For convenience, we remove the header from the dataset.

In [0]:
#@title
def drop_header(rdd):    
    def drop_first(idx, it):
        if idx == 0:
            next(it)
        return it

    return rdd.mapPartitionsWithIndex(drop_first)

rdd_beers_header = sc.textFile("beers.csv")

header = rdd_beers_header.first()

rdd_beers_unparsed = drop_header(rdd_beers_header)

Which columns does our data set contain?

In [0]:
header

This is Dutch for: 

`brand, kind, alcohol percentage, brewery`. 

The first column isn't named but represents record IDs.

In [0]:
rdd_beers_unparsed.take(3)

As you can see, each line in the RDD is just a String for now. This isn't very handy. Let's parse every line into a tuple of size 5: 

**`(ID, brand, kind, alcohol percentage, brewery)`.**

Notice the `parse_pct` function. This function performs necessary *data cleaning* because the data set contains values in the alcohol percentage column that are not floating point numbers ('NA', 'alcoholvrij', etc). 

In [0]:
def only_valid(a):
    return a[3] != 'NA'

def parse_pct(x):    
    if 'alcoholvrij' in x: # alcohol free
        return 0.0
    else:
        return float(x)    
    
def parse_beer(a):    
    try:
        return (int(a[0]), a[1], a[2], parse_pct(a[3]), a[4])
    except Error as e:
        print(e)

rdd_beers = rdd_beers_unparsed.map(lambda s: s.split(",")).filter(FIX_ME).map(FIX_ME) # Complete this

rdd_beers.take(3)

Before we start with the exercises, we define a few accessor functions for our beer tuples.

In [0]:
def ID(beer_tuple):
    return beer_tuple[0]

def brand(beer_tuple):
    return beer_tuple[1]

def kind(beer_tuple):
    return beer_tuple[2]

def alcohol_pct(beer_tuple):
    return beer_tuple[3]

def brewery(beer_tuple):
    return beer_tuple[4]

def key(pair):
    return pair[0]

def value(pair):
    return pair[1]

def add(x, y):
    return x + y

Now is the time to demonstrate your [Spark-fu](https://youtu.be/6vMO3XmNXe4?t=7) skills on the beer data set!

Good luck.

### Exercise 1: number of breweries (easy)

How many different breweries do we have?

In [0]:
distinct_breweries = rdd_beers.map(FIX_ME).FIX_ME # Complete this

nr_breweries = distinct_breweries.FIX_ME # Complete this

assert nr_breweries == 361

### Exercise 2: strongest beer (medium)

Which beer is the strongest one? (highest alcohol percentage). Use `reduce` first

In [0]:
def max_pct(beer1, beer2):
    return FIX_ME # Complete this

beer_max_pct = rdd_beers.reduce(max_pct) 

# Verify you solution
assert beer_max_pct[3] == 26.0

Another possibility is the `max(f)` method, where you pass a selector function `f` that is used for comparing items.

In [0]:
beer_max_pct_2 = rdd_beers.max(FIX_ME) # Complete this

# Verify your solution
assert beer_max_pct_2[3] == 26.0

### Exercise 3: most common alcohol percentages (medium)

How many beers are there for each alcohol percentage? Which 3 alcohol percentages are the most common?

In [0]:
def beers_per_pct():
    
    zero = FIX_ME # Complete this
    
    def seqOp(acc, entry):
        return FIX_ME # Complete this
    
    def comboOp(acc1, acc2):
        return FIX_ME # Complete this
    
    def selector(tuple):
        return FIX_ME # Complete this
    
    return rdd_beers.keyBy(FIX_ME).aggregateByKey(zero, seqOp, comboOp).sortBy(selector, ascending = False)

# Sanity check
assert rdd_beers.count() == beers_per_pct().map(value).sum()

# Verify you solution
assert beers_per_pct().take(3) == [(8.0, 180), (6.5, 150), (5.0, 131)]

### Exercise 4: brewery with strongest average beer (advanced)

Find the brewery that has the highest average alcohol percentage of all beers it makes.

In [0]:
def strongest_average():
    
    zero = FIX_ME # Complete this
    
    def seqOp(acc, beer): 
        return FIX_ME # Complete this
    
    def comboOp(acc1, acc2): 
        return FIX_ME # Complete this
    
    def avg(tuple):
        return FIX_ME # Complete this
    
    return rdd_beers.keyBy(FIX_ME).aggregateByKey(zero, seqOp, comboOp).mapValues(avg).max(value)

assert strongest_average() == ('Staminee De Garre (Brouwerij Van Steenberge)', 11.5)

### Exercise 5: breweries with most kinds of beers (advanced)

Return the top 3 breweries with most kinds of beers, also return these kinds.

In [0]:
def diverse_breweries():
    
    zero = FIX_ME # Complete this
    
    def seqOp(acc, entry):
        return FIX_ME # Complete this
    
    def comboOp(acc1, acc2):
        return FIX_ME # Complete this
    
    def selector(tuple):
        return FIX_ME # Complete this
    
    return rdd_beers.keyBy(FIX_ME).aggregateByKey(zero, seqOp, comboOp).sortBy(selector, ascending = False)
    
# Verify your answer
assert diverse_breweries().map(key).take(3) == ['Brouwerij Huyghe', 'Brouwerij Alvinne', 'Brouwerij Bavik']

# Show us the result
diverse_breweries().take(3)

Voila!

This completes your [Spark-fu](https://youtu.be/84VtdVK2a0A?t=219) training, oh mighty dragon warrior.