# Demo 5 - Accumulators, Aggregations, and Joins in Spark
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 2019`__


By the end of this demo you should be able to: 
* ... __implement__ a custom accumulator
* ... __explain__ different types of aggregations and how they are implemented in Spark.
* ... __explain__ how different join operations are implemented in Spark
* ... __explain__  the challenges of implementing the A Priori algorithm at Scale

### Notebook Set-Up

### Run the next three cells to create your DEMO5 directory 
The scala code below fetches your username automatically and creates a temporary Spark table that can be read by python in the following cell. Don't worry about understanding this code.

In [None]:
# RUN THIS CELL AS IS
# This code snippet reads the user directory name, and stores is in a python variable.
# Next, it creates a folder inside your home folder, which you will use for files which you save inside this notebook.
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
userhome = 'dbfs:/user/' + username
print(userhome)
demo5_path = userhome + "/demo5/" 
demo5_path_open = '/dbfs' + demo5_path.split(':')[-1] # for use with python open()
dbutils.fs.mkdirs(demo5_path)

In [None]:
# RUN THIS CELL AS IS
# Here we'll create a test file, and use databricks utils to makes usre everything works as expected.
# You should see a result like: dbfs:/user/<your email>@ischool.berkeley.edu/demo4/test.txt
dbutils.fs.put(demo5_path+'test5.txt',"hello world",True)
display(dbutils.fs.ls(demo5_path))

path,name,size
dbfs:/user/kylehamilton@ischool.berkeley.edu/demo5/test5.txt,test5.txt,11


In [None]:
# imports
import sys
import numpy as np
import matplotlib.pyplot as plt

In [None]:
sc = spark.sparkContext

# Accumulators
Definitive Guide book, pg. 241

Accumulators are Spark's equivalent of Hadoop counters. Like broadcast variables they represent shared information across the nodes in your cluster, but unlike broadcast variables accumulators are _write-only_ ... in other words you can only access their values in the driver program and not on your executors (where transformations are applied). As convenient as this sounds, there are a few common pitfalls to avoid. Let's take a look.

Run the following cell to create a sample data file representing a list of `studentID, courseID, final_grade`...

## Exercise 1

In [None]:
dbutils.fs.put(demo5_path+"grades.csv", 
"""10001,101,98
10001,102,87
10002,101,75
10002,102,55
10002,103,80
10003,102,45
10003,103,75
10004,101,90
10005,101,85
10005,103,60""", True)

Suppose we want to compute the average grade by course and student while also tracking the number of failing grades awarded. We might try something like this:

In [None]:
# function to increment the accumulator as we read in the data
def parse_grades(line, accumulator):
    """Helper function to parse input & track failing grades."""
    student,course,grade = line.split(',')
    grade = int(grade)
    if grade < 65:
        accumulator.add(1)
    return(student,course, grade)

In [None]:
# initialize an accumulator to track failing grades
nFailing = sc.accumulator(0)

In [None]:
# compute averages in spark
nFailing = sc.accumulator(0)
gradesRDD = sc.textFile(demo5_path+'grades.csv')\
              .map(lambda x: parse_grades(x, nFailing))

gradesRDD.cache()

studentAvgs = gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
                       .mapValues(lambda x: x[0]/x[1])



courseAvgs = gradesRDD.map(lambda x: (x[1], (x[2], 1)))\
                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
                      .mapValues(lambda x: x[0]/x[1])

In [None]:
# take a look
print("===== average by student =====")
print(studentAvgs.collect())
print("===== average by course =====")
print(courseAvgs.collect())
print("===== number of failing grades awarded =====")
print(nFailing)

In [None]:
print("===== average by student =====")
print(studentAvgs.collect())

In [None]:
print("===== number of failing grades awarded =====")
print(nFailing)

In [None]:
# <--- SOLUTION --->
# compute averages in spark
# initialize an accumulator to track failing grades
nFailing = sc.accumulator(0)

def parse_grades(line):
    """Helper function to parse input & track failing grades."""
    student,course,grade = line.split(',')
    grade = int(grade)
    return(student,course, grade)
  
def accAgg(row):
    grade = row[2]
    if grade < 65:
        nFailing.add(1)

gradesRDD = sc.textFile(demo5_path+'grades.csv')\
              .map(lambda x: parse_grades(x))

gradesRDD.cache()

gradesRDD.foreach(accAgg)


studentAvgs = gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
                       .mapValues(lambda x: x[0]/x[1])

courseAvgs = gradesRDD.map(lambda x: (x[1], (x[2], 1)))\
                      .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
                      .mapValues(lambda x: x[0]/x[1])

> __DISCUSSION QUESTIONS:__
* What is wrong with the results? (__`HINT:`__ _how many failing grades are there really?_)
* Why might this be happening? (__`HINT:`__ _How many actions are there in this code? Which parts of the DAG are recomputed for each of these actions?_)
* What one line could we add to the code to fix this problem?
  * What could go wrong with our "fix"?
* How could we have designed our parser differently to avoid this problem in the first place?

## Custom Accumulators
https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

While SparkContext supports accumulators for primitive data types like int and float, users can also define accumulators for custom types by providing a custom AccumulatorParam object. 

We may want to utilize custom accumulators later in the course when we implement PageRank, or Shortest Path (graph) algorithms

In [None]:
from pyspark.accumulators import AccumulatorParam

# Spark only implements Accumulator parameter for numeric types.
# This class extends Accumulator support to the string type.
class StringAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return value
    def addInPlace(self, val1, val2):
        return val1 +" -> "+ val2

Let's switch gears now for a moment, and look at aggregations. We'll come back to our string accumulator later in the following example.

# Aggregations

## groupByKey()
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.groupByKey   
Easy to reason about, because it's very familiar folks coming from the SQL world. However, for the majority of cases, this is the wrong approach. The fundamental issue here is that each executor must hold all values for a given key in memory before applying the function to them.

In [None]:
displayHTML('<img src="https://github.com/kyleiwaniec/w261_assets/blob/master/images/groupbykey.png?raw=true" width="100%"/>')

## reduceByKey(Func)
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.reduceByKey    
A much more stable approach to additive problems is reduceByKey. This is because the reduce happens within each partition and doesn’t need to put everything in memory. Additionally, there is no incurred shuffle during this operation; everything happens at each worker individually before performing the final reduce.

### IMPORTANT NOTE ABOUT reduceByKey() and reduce()

`reduceByKey()` is a transformation, whereas `reduce()` includes an action!
Looking at the source code for reduce(), notice the `collect()` 
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.reduce

In [None]:
displayHTML('<img src="https://github.com/kyleiwaniec/w261_assets/blob/master/images/reducebykey.png?raw=true" width="50%"/>')

## combineByKey(createCombiner, mergeValue, mergeCombiners)
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.combineByKey   
The first function input to the combiner specifies how to merge values, and the second function specifies how to merge combiners. For example, we might want to add values to a list, and subsequently merge the lists.

In [None]:
displayHTML('<img src="https://github.com/kyleiwaniec/w261_assets/blob/master/images/combinebykey.png?raw=true" width="50%"/>')

## foldByKey(zeroValue, Func)
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.foldByKey   
Calls combineByKey, but allows us to use a zero value which can be added to the result an arbitrary number of
times, and must not change the result (eg. 0 for addition, 1 for multiplication)

## aggregateByKey(zeroValue, seqOp, combOp)
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.aggregateByKey

In [None]:
displayHTML('<img src="https://github.com/kyleiwaniec/w261_assets/blob/master/images/aggregatebykey.png?raw=true" width="100%"/>')

## treeAggregate(zeroValue, seqOp, combOp, depth)
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.treeAggregate    
Same as aggregate except it “pushes down” some of the subaggregations (creating a tree from executor to executor)
before performing nal aggregations on the driver.

## Back to our example: 
What if we wanted to get a list of letter grades that each student recieved as well as their average?

__INSTRUCTOR NOTES__   
The following code block is going to fail. The objective is to help students learn debugging. We'll also pay homage to Bob Ross. While designing this exercise, your lecturer, Kyle Hamilton, made some mistakes. But just like Bob, Kyle doesn't believe in mistakes, only happy accidents!!

Some questions to ask:   

Q: What does this error message tell us?   
A: list index out of bounds   

Q: How can we go about debugging the problem?   
A: Run each function to inspect the output. Is it what we are expecting? Does this make sense of the outofbounds error?    
What's odd about the output from the reduceByKey?   
Look at the data, and observe that student 10004 has only one entry. Looks like Spark did not run the reduceByKey function because it wasn't needed. That's a double edged sword! Yes, it's efficient, but oops, it made our code break. Takeaway: the signature of the input to the reducer must match the signature of the output. Where did we see this before?   

Q: What other aggregation functions can we use here to solve this problem?   
A: Some things to try:
1. FoldByKey - we get a default value to start with   
2. CombineByKey - we have more control over how data is combined and reduced. Why didn't this work? reduceByKey calls combineByKey. We're performing the same operation as before, so this doesn't help fix our problem
3. AggregateByKey - allows us to specify a null value, forcing the evaluation of the reduce step. How is this different from foldByKey?

## Exercise 2

In [None]:
def toLetterGrade(x):
    if x > 92.0:
        return "A"
    elif x > 82.0:
        return "B"
    elif x > 72.0:
        return "C"
    elif x > 65.0:
        return "D"
    else:
        return "F"

def getCounts(a,b):
    return (a[0] + b[0], a[1] + b[1], toLetterGrade(a[0])+toLetterGrade(b[0]))
    
studentAvgs = gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .reduceByKey(getCounts)\
                       .mapValues(lambda x: ((x[0]/x[1]),x[2]))\
                       .collect()


## How can we debug this problem?
1. What is the error?
2. Insert as many cells as you need to figure out what happened. Next we'll look at some ways to "fix" it.

In [None]:
displayHTML("<img src='https://github.com/kyleiwaniec/w261_assets/blob/master/images/Bob-Ross-3.jpg?raw=true' width=50%/>")

In [None]:
gradesRDD.map(lambda x: (x[0], (x[2], 1))).collect()

In [None]:
gradesRDD.collect()

In [None]:
gradesRDD.map(lambda x: (x[0], (x[2], 1))).reduceByKey(getCounts).collect()

## Let's look at some alternative implementations:

### foldByKey allows us to specify a zero value

In [None]:
gradesRDD.map(lambda x: (x[0], (x[2], 1))).foldByKey((0,0,""), getCounts).collect()

### <--- SOLUTION --->  
__SOLUTION__   
Student 10004 is assigned an initial 'F' grade. We have not met the foldByKey requirement because our zeroValue changes the result

### Can we solve this problem using a combineByKey which provides more granular control over the parameters
https://backtobazics.com/big-data/apache-spark-combinebykey-example/

In [None]:
def createCombiner(a):
    return a

def mergeValues(a,b):
    return (a[0] + b[0], a[1] + b[1], toLetterGrade(a[0])+toLetterGrade(b[0]));

def mergeCombiners(a,b):
    return (a[0] + b[0], a[1] + b[1], toLetterGrade(a[0])+toLetterGrade(b[0]))

studentAvgs = gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .combineByKey(createCombiner,mergeValues,mergeCombiners)\
                       .mapValues(lambda x: ((x[0]/x[1]),x[2]))

In [None]:
#gradesRDD.map(lambda x: (x[0], (x[2], 1))).combineByKey(createCombiner,mergeValues,mergeCombiners).collect()

### <--- SOLUTION --->  
__SOLUTION__   
We know that `reduceByKey` calls `combineByKey` under the hood. We have not changed anything here, so the result is exactly the same as in our first attempt when we used `reduceByKey`

### aggregateByKey requires a null and start value as well as two different functions. One to aggregate within partitions, and one to aggregate across partitions

In [None]:
def seqOp(a,b):
    return(a[0] + b[0], a[1] + b[1], a[2]+toLetterGrade(b[2]))

def combOp(a,b):
    return (a+b);

In [None]:
gradesRDD.collect()

In [None]:
gradesRDD.map(lambda x: (x[0], (x[2], 1, x[2])))\
         .aggregateByKey((0,0,""),seqOp,combOp)\
         .mapValues(lambda x: ((x[0]/x[1]),x[2]))\
         .collect()


In [None]:
gradesRDD.map(lambda x: (x[0], (x[2], 1, x[2]))).aggregateByKey((0,0,""),seqOp,combOp).collect()

In [None]:
letterAccum = sc.accumulator("===", StringAccumulatorParam())
gradesRDD.foreach(lambda x: letterAccum.add(toLetterGrade(x[2])))
print (letterAccum)

# Joins

* join       
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.join     
* leftOuterJoin   
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.leftOuterJoin    
* rightOuterJoin   
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.rightOuterJoin    
* fullOuterJoin   
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.fullOuterJoin   
* cartesian   
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.cartesian

In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())
#[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]

In [None]:
sorted(x.rightOuterJoin(y).collect())

In [None]:
sorted(x.leftOuterJoin(y).collect())

Lets load some data for the code examples

In [None]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

In [None]:
# run as is
joinExpression = person["graduate_program"] == graduateProgram['id']

In [None]:
# run as is
wrongJoinExpression = person["name"] == graduateProgram["school"]

In [None]:
# run as is
person.join(graduateProgram, joinExpression).show()

In [None]:
person.join(graduateProgram, wrongJoinExpression).show()

In [None]:
# Spark perfoms an "inner" join by default. But we can specify this explicitly.
# Try different join types.
joinType = "outer"
#joinType = "left_outer"
#joinType = "right_outer"

In [None]:
person.join(graduateProgram, joinExpression, joinType).show()

### Which keys do outer joins evaluate?

### A departure from traditional joins:

In [None]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.createOrReplaceTempView("gradProgram2")

In [None]:
gradProgram2.show()

In [None]:
person.show()

In [None]:
# Think of left semi joins as filters on a DataFrame, as opposed to the function of a conventional join
joinType = "left_semi"
gradProgram2.join(person, joinExpression, joinType).show()

In [None]:
gradProgram2.show()

In [None]:
joinType = "left_anti"
gradProgram2.join(person, joinExpression, joinType).show()

### Natural Joins

__DANGER__: Natural joins make implicit guesses at the columns on which you would like to join. Why is this bad?

### Cross (Cartesian) Joins
Or, Cartesian products. Cross joinsare inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame with every single row in the right DataFrame

In [None]:
1000*1000

In [None]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

In [None]:
person.crossJoin(graduateProgram).show()

__DANGER__: How many rows would we end up with from a cross join if each table had 1000 rows?