### i0u19a Data Processing KU Leuven

# Python Spark exercises

###### _Thomas Moerman, Jan Aerts_

First, we instantiate a SparkContext object and store it in variable 'sc'.

In [None]:
import pyspark

# prevents ValueError from occurring when reloading this cell
def init_sc(prev_sc):  
    try:        
        return pyspark.SparkContext('local[*]')
    except ValueError:
        return prev_sc        

sc = init_sc(locals().get('sc'))

RDDs are the main building block in Spark programs. They represent distributed computations, partitioned over a cluster of machines. For now, don't worry too much about the technical details. We look at examples of increasing difficulty to build up your intuition on how to write Spark programs.

## Getting Sparky

We turn a range of 1000 integers in an RDD with the `parallelize(data)` method. 

In [2]:
rdd = sc.parallelize(range(1000))

As you can see, Spark doesn't compute any results right away. Think of it as storing the "recipe" for the computation in a variable.

To actually launch a computation, we can perform a `collect` or a `take` operation on the rdd. Let's try taking the top 3 elements from the rdd.

In [3]:
rdd.take(3)

[0, 1, 2]

That was easy!

Now, why is this approach a convenient? Think about it for few moments.

Imagine you wrote a complicated computation on a large dataset. This could take a while to compute. When writing and testing a computation, we can go along while only inspecting a small subset of the result by performing for example a `take(5)` operation. This way we can avoid always having to wait for the entire computation to complete.

### Mapping

Our boss decided that our sequence should not start at 0 but at 10. Of course we could re-initialize the rdd with the correct range, but let's for the sake of the exercise transform the `rdd` data set.

We need a function that allows us to increment each number in the sequence with 10.

That function is the `map` function, which takes a function as argument. Let's first define that function.

In [4]:
def inc10(i):
    return i + 10

We pass the `inc10` function as argument to the `map` method on our rdd.

In [5]:
rdd_inc10 = rdd.map(inc10)

rdd_inc10.take(3)

[10, 11, 12]

Excellent. 

Now, instead of passing a named function, we can also pass an anonymous function function to the `map` method. Python uses lambda expressions for inline anonymous functions.

In [6]:
rdd_inc10_anon = rdd.map(lambda i: i + 10)

rdd_inc10_anon.take(3)

[10, 11, 12]

So that works as well. Often it is more convenient to use anonymous functions, but it all depends on the preference of the programmer of course, there's no right or wrong in this matter.

### Filtering

Let's now make our range of numbers a bit more interesting. We are now only interested in even numbers. In order to express this, we need to `filter` our rdd.

In [7]:
def is_even(x):
    return x % 2 == 0 # complete this

rdd_even = rdd_inc10.filter(is_even) 

The `filter` method takes a function or lambda expression that takes an integer `i` and produces a `boolean` value that decides whether or not to keep the value `i` in the resulting rdd.

Let's test by taking a random sample of 3 items from the rdd.

In [8]:
even_3 = rdd_even.takeSample(False, 3)

for i in even_3:
    assert is_even(i)
    
even_3

[596, 310, 60]

Only even numbers, right? Good.

### Reducing

Let's now perform an aggregation on the data. The workhorse function for simple aggregations is the `reduce(f)` function. Look up the `reduce` function in the [pyspark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) page. It says:

**`reduce(f)`**

> Reduces the elements of this RDD using the specified _commutative and associative binary operator_.

A binary operator is a function that takes 2 inputs and produces one output. Inputs and outputs have the same type. Can you come up with some examples of binary operators?

Straightforward examples are mathematical operators like addition (`+`) and multiplication (`*`). Note that these operators are not restricted to numbers, we can also define commutative and associative binary operators on other data types like hashmaps, as we'll see in a later example.

Now complete following reduction with the appropriate lambda function.

In [9]:
sum_even = rdd_even.reduce(lambda x, y: x + y) # complete this

...and check the result (`assert` complains when the answer is wrong).

In [10]:
assert sum_even == 254500

Now let's do something a bit more difficult. We want to calculate the average of the even numbers, using a single reduction. Instead of working with integers, we will work with the [tuple](http://www.tutorialspoint.com/python/python_tuples.htm) data type, representing pairs of (total, count).

Complete the commutative and associative binary operator `sum_pairs`.

In [11]:
def sum_pairs(pair1, pair2):
    # complete this
    x, i = pair1
    y, j = pair2
    return (x + y, i + j)

total1, count1 = rdd_even.map(lambda x: (x, 1)).reduce(sum_pairs)

avg1 = total1 / count1

Check the result.

In [12]:
assert avg1 == 509

The previous exercise is perhaps the most typical example of a **Map/Reduce** application.

We **map** the inputs into a convenient shape, and **reduce** these shapes into the final result.

### Aggregating

Spark provides several aggregation methods that provide the scaffolding for performing Map/Reduce operations. We now explore on of these methods, called `aggregate`. Let's check out the [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) again. 

**`aggregate(zeroValue, seqOp, combOp)`**

The example in the `aggregate` documentation should make sense now. Although the answer is already given, let's do it one more time to train our Spark muscle memory. Show me [wax on, wax off](https://youtu.be/2ynryUjGFt8?t=176)!

In [13]:
zero_value = (0, 0)

def seqOp(pair, x):
    # complete this
    total, count = pair
    return (total+x, count+1)

total2, count2 = rdd_even.aggregate(zero_value, seqOp, sum_pairs)

avg2 = total2 / count2

The resulting average should be the same.

In [14]:
assert avg2 == 509

Muy bien.

### Aggregating by key

A very common principle in data processing is *aggregation by key*. Before we perform an aggregation by key, we need to introduce another Spark concept, the `PairRDD`. A `PairRDD` is an RDD where each item consists of a pair, or tuple of size 2. In such a tuple, the first element is considered the key, the second element the value.

Let's illustrate with a simple example. We use the original rdd and transform it into a new shape, where each integer is keyed by a boolean that says whether it is even or not. We can reuse the `is_even(x)` function we defined before.

In [15]:
rdd_by_key = rdd.keyBy(lambda x: is_even(x)) # Complete this

assert rdd_by_key.take(3) == [(True, 0), (False, 1), (True, 2)]

Let's now perform a reduction by key that returns the sum of even and odd integers.

In [16]:
sum_by_key = rdd_by_key.reduceByKey(lambda x, y: x + y).collect() # complete this

assert sum_by_key == [(False, 250000), (True, 249500)]

The last aggregation method we will look at for now is the aggregateByKey method. Again, consult the [documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD).

**`aggregateByKey(zeroValue, seqFunc, combFunc)`**

Use this to calculate the averages by key instead of the sum by key. We break up the calculation in two steps: first we calculate the sum and count by key, next we calculate the average by key, by using the `mapValues` method. 

`mapValues` transforms only the "value" elements (right) in a `PairRDD`, the keys (left) remain intact.

In [17]:
sum_count_by_key = rdd_by_key.aggregateByKey((0,0), seqOp, sum_pairs) # Complete this

assert sum_count_by_key.collect() == [(False, (250000, 500)), (True, (249500, 500))]

In [18]:
def sum_count_to_avg(sum_count):
    sum, count = sum_count # Complete this
    return sum / count

avg_by_key = sum_count_by_key.mapValues(sum_count_to_avg).collect() # Complete this

assert avg_by_key == [(False, 500.0), (True, 499.0)]

Excellent.

We now have some useful methods in our Spark toolbox. We know how to `map`, `filter`, `reduce` and `aggregate`, and we know how to aggregate by key.

That made me thirsty, are you thirsty? 

Let's move on to the beer dataset :-)

## Dos cervezas, por favor!

We start with reading in the `beers.csv` data set. For convenience, we remove the header from the dataset.

In [19]:
def drop_header(rdd):    
    def drop_first(idx, it):
        if idx == 0:
            next(it)
        return it

    return rdd.mapPartitionsWithIndex(drop_first)

rdd_beers_header = sc.textFile("/usr/local/data/beer/beers.csv")

header = rdd_beers_header.first()

rdd_beers_unparsed = drop_header(rdd_beers_header)

Which columns does our data set contain?

In [20]:
header

',Merk,Soort,Percentagealcohol,Brouwerij'

This is Dutch for: 

`brand, kind, alcohol percentage, brewery`. 

The first column isn't named but represents record IDs.

In [21]:
rdd_beers_unparsed.take(3)

["1,3 SchtÃ©ng,hoge gisting,6,Brasserie Grain d'Orge",
 "2,400,blond,5.6,'t Hofbrouwerijke voor Brouwerij Montaigu",
 '3,IV Saison,saison,6.5,Brasserie de Jandrain-Jandrenouille']

As you can see, each line in the RDD is just a String for now. This isn't very handy. Let's parse every line into a tuple of size 5: 

**`(ID, brand, kind, alcohol percentage, brewery)`.**

Notice the `parse_pct` function. This function performs necessary *data cleaning* because the data set contains values in the alcohol percentage column that are not floating point numbers ('NA', 'alcoholvrij', etc). 

In [22]:
def parse_pct(x):
    if x == 'NA':            # unknown alcohol percentage
        return -1.0
    elif 'alcoholvrij' in x: # alcohol free
        return 0.0
    else:
        return float(x)    

def parse_beer(a):    
    try:
        return (int(a[0]), a[1], a[2], parse_pct(a[3]), a[4])
    except Error as e:
        print(e)

rdd_beers = rdd_beers_unparsed.map(lambda s: s.split(",")).map(parse_beer) # complete this

rdd_beers.take(3)

[(1, '3 SchtÃ©ng', 'hoge gisting', 6.0, "Brasserie Grain d'Orge"),
 (2, '400', 'blond', 5.6, "'t Hofbrouwerijke voor Brouwerij Montaigu"),
 (3, 'IV Saison', 'saison', 6.5, 'Brasserie de Jandrain-Jandrenouille')]

Before we start with the exercises, we define a few accessor functions for our beer tuples.

In [23]:
def ID(beer_tuple):
    return beer_tuple[0]

def brand(beer_tuple):
    return beer_tuple[1]

def kind(beer_tuple):
    return beer_tuple[2]

def alcohol_pct(beer_tuple):
    return beer_tuple[3]

def brewery(beer_tuple):
    return beer_tuple[4]

def key(pair):
    return pair[0]

def value(pair):
    return pair[1]

def add(x, y):
    return x + y

Now is the time to demonstrate your [Spark-fu](https://youtu.be/6vMO3XmNXe4?t=7) skills on the beer data set!

Good luck.

### Exercise 1: number of breweries (easy)

How many different breweries do we have?

In [24]:
distinct_breweries = rdd_beers.map(brewery).distinct() # complete this

nr_breweries = distinct_breweries.count()

assert nr_breweries == 362

### Exercise 2: strongest beer (medium)

Which beer is the strongest one? (highest alcohol percentage). Use `reduce` first

In [25]:
def max_pct(beer1, beer2):    
    if (alcohol_pct(beer1) > alcohol_pct(beer2)):
        return beer1
    else:
        return beer2

beer_max_pct = rdd_beers.reduce(max_pct) # complete this

# Verify you solution
assert beer_max_pct[3] == 26.0

Another possibility is the `max(f)` method, where you pass a selector function `f` that is used for comparing items.

In [26]:
beer_max_pct_2 = rdd_beers.max(alcohol_pct) # complete this

# Verify your solution
assert beer_max_pct_2[3] == 26.0

### Exercise 3: most common alcohol percentages (medium)

How many beers are there for each alcohol percentage? Which 3 alcohol percentages are the most common?

In [27]:
# Complete this
def beers_per_pct():
    
    zero = 0
    
    def seqOp(acc, entry):
        return acc + 1
    
    def comboOp(acc1, acc2):
        return acc1 + acc2
    
    def selector(tuple):
        return value(tuple)
    
    return rdd_beers.keyBy(alcohol_pct).aggregateByKey(zero, seqOp, comboOp).sortBy(selector, ascending = False)

# Sanity check
assert rdd_beers.count() == beers_per_pct().map(value).sum()

# Verify you solution
assert beers_per_pct().take(3) == [(8.0, 180), (6.5, 150), (5.0, 131)]

### Exercise 4: brewery with strongest average beer (advanced)

Find the brewery that has the highest average alcohol percentage of all beers it makes.

In [28]:
def strongest_average():
    
    zero = (0,0)
    
    def seqOp(sum_count, beer): # complete this
        sum, count = sum_count
        return (sum + alcohol_pct(beer), count + 1)
    
    def comboOp(t1, t2): # complete this
        return (t1[0] + t2[0], t1[1] + t2[1])
    
    def avg(sum_count): 
        sum, count = sum_count
        return sum / count
    
    return rdd_beers.keyBy(brewery).aggregateByKey(zero, seqOp, comboOp).mapValues(avg).max(value)

assert strongest_average() == ('Staminee De Garre (Brouwerij Van Steenberge)', 11.5)

### Exercise 5: breweries with most kinds of beers (advanced)

Return the top 5 breweries with most kinds of beers, also return these kinds.

In [29]:
def diverse_breweries():
    
    zero = set()
    
    def seqOp(kinds, beer):
        kinds.add(kind(beer))
        return kinds
    
    def comboOp(kinds1, kinds2):
        [kinds1.add(k) for k in kinds2]
        return kinds1
    
    def selector(pair):
        return len(pair[1])    
    
    return rdd_beers.keyBy(brewery).aggregateByKey(zero, seqOp, comboOp).sortBy(selector, ascending = False)
    
# Verify your answer
assert diverse_breweries().map(key).take(5) == ['Brouwerij Huyghe', 'Brouwerij Alvinne', 'Brouwerij Bavik', 'Brouwerij Strubbe', 'Brouwerij Van Steenberge']

# Show us the result
diverse_breweries().take(5)

[('Brouwerij Huyghe',
  {'Erkend Belgisch Abdijbier',
   'Erkend Belgisch Abdijbier;tripel',
   'Speciale Belge',
   'abdijbier',
   'abdijbier;tripel',
   'blond',
   'bruin',
   'donker;hoge gisting',
   'fruitbier',
   'glutenvrij pilsener',
   'hoge gisting',
   'honingbier',
   'kerstbier',
   'pils',
   'pils;biologisch',
   'roodbruin',
   'sterk blond',
   'sterk donker',
   'tripel',
   'witbier'}),
 ('Brouwerij Alvinne',
  {'Brown ale',
   'IPA',
   'Vlaams oud bruin',
   'amber',
   'blond',
   'donkerblond',
   'donkerbruin',
   'donkerrood',
   'fruitbier;oak aged',
   'hoge gisting',
   'hoge gisting;tripel',
   'imperial stout',
   'lambiek',
   'lichtamber',
   'oak aged',
   'quadrupel;oak aged',
   'stout',
   'tripel',
   'winterbier'}),
 ('Brouwerij Bavik',
  {'aged pale',
   'alcoholarm',
   'blond',
   'blond;hoge gisting',
   'bokbier',
   'bruin',
   'hoge gisting',
   'hoge gisting;blond',
   'oud bruin',
   'pils',
   'speciaalbier',
   'stout',
   'tafelbier'

Voila!

This completes your [Spark-fu](https://youtu.be/84VtdVK2a0A?t=219) training, oh mighty dragon warrior.