<img style="float: left" src="images/spark.png" />
<img style="float: right" src="images/surfsara.png" />
<hr style="clear: both" />

# Aggregates on RDDs in Apache Spark

This notebook provides a short introduction on the _aggregate_ function in Apache Spark. Aggregate is quite a low-level function in that it exposes the partitions of the RDD. (By a partition we mean a part of an RDD that resides on a single worker).

_This is an optional notebook for those that finish early with the Spark Introduction notebook._


In [None]:
# initialize Spark
from pyspark import SparkContext, SparkConf

if not 'sc' in globals(): # This 'trick' makes sure the SparkContext sc is initialized exactly once
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)

First, let us create an RDD nums with a few integers. 

In [None]:
nums = sc.parallelize([1, 2, 2, 4], 2)
nums.reduce(lambda x, y : x + y)

It's easy in Spark to compute the mean of an RDD of integers by calling the _mean_ method.

In [None]:
nums.mean()

Let's think how to implement this function ourselves. Note that the records of the RDD, the integers in this case, can be on different nodes/partitions. We want to minimize data traffic as much as possible.

The idea is this. Suppose we have an RDD with two partitions. We first sum all integers in each partition, and also keep track of the number of records. The mean is the total sum divided by the total number of records.

So in the case of the RDD nums that contains the integers [1, 2, 2, 4] we have two partitions. Let's assume these are [1, 2] and [2, 4]. Adding the integers in these partition will give us 3 and 6 respectively. Both contain two records, so we end up with (3, 2) and (6, 2). The mean is then (3 + 2)/(2 + 2) = 5/4 

Now try to do this in a map and reduce.

_Hint_: the reduce step should return a tuple, the first element being the sum of sums, the second the number of elements. Also remember that the input function for reduce should be of type _func(T, T) -> T_. That is, the type of the output should be the same as the two output values of the function. In this case there all of type _int_.

In [None]:
# TODO: Replace <FILL IN> with appropriate code

nums_pair = <FILL IN>
avg = <FILL IN>
print('Average is ' + str(avg[0]/float(avg[1])))

## SeqOp and CombOp

The input type of the function that goes into the reduce function is restricted, which makes it sometimes hard to work with. However, there is a way around this, by using the aggregate function. It is similar to a map and reduce but at first glance seems more complicated.

Aggregate has two stages which we will cal **SeqOp** and **CombOp**.

In the SeqOp stage, a function (called _seqOp_) will be applied on each partition and the results will be tracked using an accumulator for that partition. The stage ends in a list of accumulators (as many as we have partitions).

In the CombOp stage, a function (called _combOp_) will be applied to the accumulators of the first stage. 

(You can see the map and reduce operations shining through here). The first stage is executed for each partition (like a map) and the second means combining (reducing) the accumulated results of all partitions. 

Aggregate needs three inputs: a zero value - or start - value for the accumulators, and two functions. One function is applied to each partition to get the accumulators, and the second to aggregate the accumulators.  

Let's see how that works.

In [None]:
inputrdd = sc.parallelize([("maths", 21),("english", 22),("science", 30), ("cs", 12)], 2)

In [None]:
# Note that acc, the first argument of the seQop function is the accumulator and is 
# initially assigned the zero value. The second argument denotes a record in the rdd. val[1] stands 
# for the number in the tuple  
seqOp = lambda acc, val : acc + val[1]

# combOp just adds up the accumulators for all partitions
combOp = lambda acc1, acc2 : acc1 + acc2

# We call the aggregate function, using 0 as zero value.
inputrdd.aggregate(0, seqOp, combOp)

This is executed as follows:

First, SeqOp is applied for all partitions, two in this case:

Note that acc is assigned the zero value, 0, the first input element of aggregate.

```
partition 1: acc = 0 (zero value) + 21 (val[1])
             acc = 21 (acc) + 22 (next val[1]) 
             acc = 43
partition 2: acc = 0 (zero value) + 30 (val[1])
             acc = 30 (acc) + 12 (next val[1])
             acc = 42
```

Second, CombOp is then applied to all accumulators (including the zero value * nr of partitions).

$$ 41 + 42+ 0 = 85 $$

## Computing the average with aggregate

Let us try to compute the average or mean of a field in an RDD. First, we create an RDD with personal details. We then want to know the average age.

In [None]:
people = []
people.append({'name':'Bob', 'age':45,'gender':'M'})
people.append({'name':'Gloria', 'age':43,'gender':'F'})
people.append({'name':'Albert', 'age':28,'gender':'M'})
people.append({'name':'Laura', 'age':33,'gender':'F'})
people.append({'name':'Simone', 'age':18,'gender':'F'})
peopleRdd=sc.parallelize(people, 2)
peopleRdd.collect()

The aggregate function will not provide us with the average directly but with a tuple (SUM, L), where sum is the total sum of the ages and L is the number of people in the RDD. The average is then easily computed by SUM divided by L.

Like the previous example we need a zero element and the seqOp and combOp functions.

The zero value must be of the same type as the output, a tuple of integers in this case. The accumulator in this case is a tuple of the form (SUM, L). Initially, the accumulators get the value of the zero value, which is (0,0) in this case.

Note the seqOp function will run for each partition. We add all the sums and count all the records.

In [None]:
# TODO: Replace <FILL IN> with appropriate code

# HINT: x is the accumulator and consists of the tuple (SUM, L) as described above. So x[0] denotes the summed ages of 
# the partition. x[1] the count of records. 

# y represents the record. To get the value of the age field use y['age']

seqOp = <FILL IN>   

The compOp function works on the accumulators of all partitions. So x and y both stand for accumulators in the code below. Note that here the accumulators are tuples.

In [None]:
# TODO: Replace <FILL IN> with appropriate code

combOp = <FILL IN>

We then use these functions in the aggregate function together with the zero element (0,0). 

The average is then computed in Python, on the driver. 

In [None]:
agg = peopleRdd.aggregate((0,0), seqOp, combOp)
print('Average is ' + str(agg[0]/float(agg[1])))