This Notebook shows introduces the basic concepts of RDDs and operations on them visually, by showing the contents of the RDDs as a table.

**Note: If you are looking at this in GitHub, you may not be able to see the HTML tables. Make sure to use the nbviewer link: http://nbviewer.jupyter.org/github/umddb/cmsc424-fall2016/tree/master/**

### Introduction

Apache Spark is a relatively new cluster computing framework, developed originally at UC Berkeley. It significantly generalizes the 2-stage Map-Reduce paradigm (originally proposed by Google and popularized by open-source Hadoop system); Spark is instead based on the abstraction of **resilient distributed datasets (RDDs)**. An RDD is basically a distributed collection of items, that can be created in a variety of ways. Spark provides a set of operations to transform one or more RDDs into an output RDD, and analysis tasks are written as chains of these operations.

### Display RDD
The following helper functions displays the current contents of an RDD (partition-by-partition). This is best used for small RDDs with manageable number of partitions.

In [1]:
class DisplayRDD:
        def __init__(self, rdd):
                self.rdd = rdd

        def _repr_html_(self):                                  
                x = self.rdd.mapPartitionsWithIndex(lambda i, x: [(i, [y for y in x])])
                l = x.collect()
                s = "<table style=\"border: 1px\"><tr>{}</tr><tr><td>".format("".join(["<th>Partition {}".format(str(j)) for (j, r) in l]))
                s += '</td><td valign="bottom" halign="left">'.join(["<ul><li>{}</ul>".format("<li>".join([str(rr) for rr in r])) for (j, r) in l])
                s += "</td></table>"
                return s

### Importing the packages 

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

### Basics 1
Lets start with some basic operations using a small RDD to visualize what's going on. We will create a RDD of Strings, using the `states.txt` file which contains a list of the state names.

The notebook has already initialized a SparkContext, and we can refer to it as `sc`.

We will use `sc.textFile` to create this RDD. This operations reads the file and treats every line as a separate object. We will use DisplayRDD() to visualize it. The second argument of `sc.textFile` is the number of partitions. We will set this as 10 to get started. If we don't do that, Spark will only create a single partition given the file is pretty small.

In [3]:
states_rdd = sc.textFile('states.txt', 10)
DisplayRDD(states_rdd)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4,Partition 5,Partition 6,Partition 7,Partition 8,Partition 9
AlabamaHawaiiMassachusettsNew MexicoSouth Dakota,AlaskaIdahoMichiganNew YorkTennesseeArizona,IllinoisMinnesotaNorth CarolinaTexas,ArkansasIndianaMississippiNorth DakotaUtah,CaliforniaIowaMissouriOhioVermontColorado,KansasMontanaOklahomaVirginiaConnecticutKentucky,NebraskaOregonWashingtonDelawareLouisiana,NevadaPennsylvaniaWest VirginiaFlorida,MaineNew HampshireRhode IslandWisconsinGeorgia,MarylandNew JerseySouth CarolinaWyoming


The above table shows the contents of each partition as a list -- so the first Partition has 5 elements in it ('Alabama', ...). We can `repartition` the RDD to get a fewer partitions so it will be easier to see.

In [4]:
states_rdd = states_rdd.repartition(5)
DisplayRDD(states_rdd)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
ArkansasIndianaMississippiNorth DakotaUtahMaineNew HampshireRhode IslandWisconsinGeorgiaMarylandNew JerseySouth CarolinaWyoming,AlabamaHawaiiMassachusettsNew MexicoSouth DakotaAlaskaIdahoMichiganNew YorkTennesseeArizona,NebraskaOregonWashingtonDelawareLouisianaNevadaPennsylvaniaWest VirginiaFlorida,CaliforniaIowaMissouriOhioVermontColoradoKansasMontanaOklahomaVirginiaConnecticutKentucky,IllinoisMinnesotaNorth CarolinaTexas


Let's do a transformation where we convert a string to a 2-tuple, where the second value is the length of the string. We will just use a `map` for this -- we have to provide a function as the input that transforms each element of the RDD. In this case, we are using the `lambda` keyword to define a function inline. See here: https://pythonconquerstheuniverse.wordpress.com/2011/08/29/lambda_tutorial/ for a tutorial on lambda functions.

The below lambda function is simply taking in a string: s, and returning a 2-tuple: (s, len(s))

In [6]:
def f1 (s):
    return (s, len(s))
states1 = states_rdd.map(f1)
DisplayRDD(states1)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"('Arkansas', 8)('Indiana', 7)('Mississippi', 11)('North Dakota', 12)('Utah', 4)('Maine', 5)('New Hampshire', 13)('Rhode Island', 12)('Wisconsin', 9)('Georgia', 7)('Maryland', 8)('New Jersey', 10)('South Carolina', 14)('Wyoming', 7)","('Alabama', 7)('Hawaii', 6)('Massachusetts', 13)('New Mexico', 10)('South Dakota', 12)('Alaska', 6)('Idaho', 5)('Michigan', 8)('New York', 8)('Tennessee', 9)('Arizona', 7)","('Nebraska', 8)('Oregon', 6)('Washington', 10)('Delaware', 8)('Louisiana', 9)('Nevada', 6)('Pennsylvania', 12)('West Virginia', 13)('Florida', 7)","('California', 10)('Iowa', 4)('Missouri', 8)('Ohio', 4)('Vermont', 7)('Colorado', 8)('Kansas', 6)('Montana', 7)('Oklahoma', 8)('Virginia', 8)('Connecticut', 11)('Kentucky', 8)","('Illinois', 8)('Minnesota', 9)('North Carolina', 14)('Texas', 5)"


Lets collect all the names with the same length together using a group by operation. 
```
groupByKey([numTasks]) 	When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
```
This wouldn't work as is, because state1 is using the name as the key. Let's change that around.

In [7]:
states2 = states1.map(lambda x: (x[1], x[0]))
DisplayRDD(states2)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"(8, 'Arkansas')(7, 'Indiana')(11, 'Mississippi')(12, 'North Dakota')(4, 'Utah')(5, 'Maine')(13, 'New Hampshire')(12, 'Rhode Island')(9, 'Wisconsin')(7, 'Georgia')(8, 'Maryland')(10, 'New Jersey')(14, 'South Carolina')(7, 'Wyoming')","(7, 'Alabama')(6, 'Hawaii')(13, 'Massachusetts')(10, 'New Mexico')(12, 'South Dakota')(6, 'Alaska')(5, 'Idaho')(8, 'Michigan')(8, 'New York')(9, 'Tennessee')(7, 'Arizona')","(8, 'Nebraska')(6, 'Oregon')(10, 'Washington')(8, 'Delaware')(9, 'Louisiana')(6, 'Nevada')(12, 'Pennsylvania')(13, 'West Virginia')(7, 'Florida')","(10, 'California')(4, 'Iowa')(8, 'Missouri')(4, 'Ohio')(7, 'Vermont')(8, 'Colorado')(6, 'Kansas')(7, 'Montana')(8, 'Oklahoma')(8, 'Virginia')(11, 'Connecticut')(8, 'Kentucky')","(8, 'Illinois')(9, 'Minnesota')(14, 'North Carolina')(5, 'Texas')"


Note above that Spark did not do a shuffle to ensure that the same `keys` end up on the same partition. In fact, the `map` operation does not do a shuffle. 

Now we can do a groupByKey. 

In [8]:
states3 = states2.groupByKey()
DisplayRDD(states3)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"(5, )(10, )","(11, )(6, )","(7, )(12, )","(8, )(13, )","(4, )(9, )(14, )"


That looks weird... it seems to have done a group by, but we are missing the groups themselves. This is because the type of the value is a `pyspark.resultiterable.ResultIterable` which our DisplayRDD code does not translate into strings. We can fix that by converting the `values` to lists, and then doing DisplayRDD.

**Note that this is a limitation of the HTML code that we generate above -- you don't have to do this in order to further process this RDD.**

In [9]:
DisplayRDD(states3.mapValues(list))

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"(5, ['Maine', 'Idaho', 'Texas'])(10, ['New Jersey', 'New Mexico', 'Washington', 'California'])","(11, ['Mississippi', 'Connecticut'])(6, ['Hawaii', 'Alaska', 'Oregon', 'Nevada', 'Kansas'])","(7, ['Indiana', 'Georgia', 'Wyoming', 'Alabama', 'Arizona', 'Florida', 'Vermont', 'Montana'])(12, ['North Dakota', 'Rhode Island', 'South Dakota', 'Pennsylvania'])","(8, ['Arkansas', 'Maryland', 'Michigan', 'New York', 'Nebraska', 'Delaware', 'Missouri', 'Colorado', 'Oklahoma', 'Virginia', 'Kentucky', 'Illinois'])(13, ['New Hampshire', 'Massachusetts', 'West Virginia'])","(4, ['Utah', 'Iowa', 'Ohio'])(9, ['Wisconsin', 'Tennessee', 'Louisiana', 'Minnesota'])(14, ['South Carolina', 'North Carolina'])"


There it goes. Now we can see that the operation properly grouped together the state names by their lengths. This operation required a `shuffle` since originally all names with length, say 10, were all over the place.

`groupByKey` does not reduce the size of the RDD. If we were interested in `counting` the number of states with a given length (i.e., a `group by count` query), we can use `reduceByKey` instead. However that requires us to do a map first.

In [10]:
states4 = states2.mapValues(lambda x: 1)
DisplayRDD(states4)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"(8, 1)(7, 1)(11, 1)(12, 1)(4, 1)(5, 1)(13, 1)(12, 1)(9, 1)(7, 1)(8, 1)(10, 1)(14, 1)(7, 1)","(7, 1)(6, 1)(13, 1)(10, 1)(12, 1)(6, 1)(5, 1)(8, 1)(8, 1)(9, 1)(7, 1)","(8, 1)(6, 1)(10, 1)(8, 1)(9, 1)(6, 1)(12, 1)(13, 1)(7, 1)","(10, 1)(4, 1)(8, 1)(4, 1)(7, 1)(8, 1)(6, 1)(7, 1)(8, 1)(8, 1)(11, 1)(8, 1)","(8, 1)(9, 1)(14, 1)(5, 1)"


`reduceByKey` takes in a single reduce function as the input which tells us what to do with any two values. In this case, we are simply going to use sum them up.

In [11]:
DisplayRDD(states4.reduceByKey(lambda v1, v2: v1 + v2))

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
"(5, 3)(10, 4)","(11, 2)(6, 5)","(7, 8)(12, 4)","(8, 12)(13, 3)","(4, 3)(9, 4)(14, 2)"


These operations could be done faster through using `aggregateByKey`, but the syntax takes some getting used to. `aggregateByKey` takes a `start` value, a function that tells it what to do for a given element in the RDD, and another reduce function. 

In [None]:
DisplayRDD(states2.aggregateByKey(0, lambda k, v: k+1, lambda v1, v2: v1+v2))

### Basics 2: FlatMap

Unlike a `map`, the function used for `flatMap` returns a list -- this is used to allow for the possibility that we will generate different numbers of outputs for different elements. Here is an example where we split each string in `states_rdd` into multiple substrings.

The lambda function below splits a string into chunks of size 5: so 'South Dakota' gets split into 'South', ' Dako', 'ta', and so on. The lambda function itself returns a list. If you try this with 'map' the result would not be the same.

In [12]:
DisplayRDD(states_rdd.flatMap(lambda x: [str(x[i:i+5]) for i in range(0, len(x), 5)]))

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4
ArkansasIndianaMississippiNorth DakotaUtahMaineNew HampshireRhode IslandWisconsinGeorgiaMarylandNew JerseySouth CarolinaWyoming,AlabamaHawaiiMassachusettsNew MexicoSouth DakotaAlaskaIdahoMichiganNew YorkTennesseeArizona,NebraskaOregonWashingtonDelawareLouisianaNevadaPennsylvaniaWest VirginiaFlorida,CaliforniaIowaMissouriOhioVermontColoradoKansasMontanaOklahomaVirginiaConnecticutKentucky,IllinoisMinnesotaNorth CarolinaTexas


### Basics 3: Joins

Finally, lets look at an example of joins. We will still use small RDDs, but we now need two of them. We will just use `sc.parallelize` to create those RDDs. That functions takes in a list and creates an RDD of that by creating partitions and splitting them across machines. It takes the number of partitions as the second argument (optional).

Note again that Spark made no attempt to co-locate the objects (i.e., the tuples) with the same key.

In [13]:
rdd1 = sc.parallelize([('alpha', 1), ('beta', 2), ('gamma', 3), ('alpha', 5), ('beta', 6)], 3)
DisplayRDD(rdd1)

Partition 0,Partition 1,Partition 2
"('alpha', 1)","('beta', 2)('gamma', 3)","('alpha', 5)('beta', 6)"


In [14]:
rdd2 = sc.parallelize([('alpha', 'South Dakota'), ('beta', 'North Dakota'), ('zeta', 'Maryland'), ('beta', 'Washington')], 3)
DisplayRDD(rdd2)

Partition 0,Partition 1,Partition 2
"('alpha', 'South Dakota')","('beta', 'North Dakota')","('zeta', 'Maryland')('beta', 'Washington')"


Here is the definition of join from the programming guide.
```
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. 
```
We want to join on the first attributes, so we can just call join directly, otherwise a map may have been required.

In [15]:
rdd3 = rdd1.join(rdd2)
DisplayRDD(rdd3)

Partition 0,Partition 1,Partition 2,Partition 3,Partition 4,Partition 5
,,,,,"('alpha', (1, 'South Dakota'))('alpha', (5, 'South Dakota'))('beta', (2, 'North Dakota'))('beta', (2, 'Washington'))('beta', (6, 'North Dakota'))('beta', (6, 'Washington'))"


There is a bunch of empty partitions. We could have controlled the number of partitions with an optional argument to join. But in any case, the output looks like what we were trying to do. Using `outerjoins` behaves as you would expect, with two extra tuples for fullOuterJoin.

In [None]:
DisplayRDD(rdd1.fullOuterJoin(rdd2))

`cogroup` is a related function, but basically creates two lists with each key. The `value` in that case is more complex, and our code above can't handle it. As we can see, there is a single object corresponding to each key, and the values are basically a pair of `iterables`.

In [None]:
DisplayRDD(rdd1.cogroup(rdd2))

### Basics 4

Here we show a `word count` example using another file `play.txt`. You can use the DisplayRDD function here, but the output is rather large.

In [None]:
textFile = sc.textFile("play.txt", 10)

In [None]:
textFile.count()

In [None]:
textFile.take(5)

The following command does a word count, by first separating out the words using a `flatMap`, and then using a `reduceByKey`.

In [None]:
counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
DisplayRDD(counts)

The above single-line code chained a sequence of Spark operations. Here is how it would look if you were to separate all of those out into individual functions:
```
def split(line): 
    return line.split(" ")
def generateone(word): 
    return (word, 1)
def sum(a, b):
    return a + b

textfile.flatMap(split).map(generateone).reduceByKey(sum)
```

In [16]:
t1 = sc.textFile("lkjsdflkjsdf")

In [17]:
t2 = t1.map(lambda x: (x, 1))

In [18]:
t3 = t2.filter(True)

In [19]:
t3.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/jovyan/notebooks/lkjsdflkjsdf
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:53)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
