# Introduction to Spark Notebook

The following code sets up the "Spark Context" which is how we interact with Spark from Python. 

Useful Spark Python documentation can be found here: 

https://spark.apache.org/docs/latest/rdd-programming-guide.html
	
https://spark.apache.org/docs/latest/api/python/


In [None]:
import pyspark

sc = pyspark.SparkContext()

Don't forget to stop the context when you are done. 

In [19]:
#sc.stop() #commented out so that you don't stop your context by mistake

Let's create an RDD, using the parallelize function.

In [None]:
data = [1, 2, 3, 4, 5]
myRDD = sc.parallelize (data)

What happend? Nothing was returned.

We have to ask Spark to give us back the processed data.

One function to do this is **collect**

In [None]:
myData = myRDD.collect()
myData

What type of object did we get back?

In [None]:
type(myData)

Another function is **first**

In [None]:
myData = myRDD.first()
myData

In [None]:
type(myData)

**take** returns the first _n_ elements

In [None]:
myData = myRDD.take(2)
myData

Yet another is **top**

In [None]:
myData = myRDD.top(3)
myData

In [None]:
type(myData)

In [None]:
type(myData)

Let's create an RDD from a range

In [None]:
myRDDrange = sc.parallelize (range (20000))

How many elements are in the RDD?

In [None]:
myRDDrange.count()

In [None]:
myRDDrange.top(5)

We can also create an RDD from a file

In [None]:
!pwd


In [None]:
md = sc.textFile ('./data/Moby-Dick.txt')

did it work?

In [None]:
md.count()

Let's check using one of our command line tools, word count

In [None]:
!wc -l ./data/Moby-Dick.txt

In [None]:
md.take(50)

In [None]:
md.top(10)

We can also create an RDD from another RDD
The classic example for this is word count

In [None]:
def countWords (fileName):
     lines = sc.textFile (fileName)
     tokens = lines.flatMap (lambda line: line.split(" "))
     instances = tokens.map (lambda word: (word, 1))
     aggCounts = instances.reduceByKey (lambda a, b: a + b)
     return aggCounts.top (200, key=lambda p: p[1])


There are all sorts of transformations here:

**flatMap**

**map**

**reduceByKey**

We will walk through each of them


But first, a discussion about 
## lambdas

* Basically, a function that that we can pass like a variable
* Key ability: can **capture** its surroundings at creation
* Can also accept parameters


In [None]:
def addTwelveToResult (myLambda):
     return myLambda (3) + 12

Let's set up some variables and a lambda to use:

In [None]:
a = 23 # this is being captured
aCoolLambda = lambda x : x + a
a


In [None]:
type(aCoolLambda)

What should we get when we call ``addTwelveToResult``?

In [None]:
addTwelveToResult (aCoolLambda) # prints 38

38? Why not 35? I'm adding 12 to ``a``, right?

In [None]:
38-23


Looks like we added 15 instead. Why?

The key here is that the function ``addTwelveToResult`` adds 12 to the value returned by calling the specified lambda with the value 3.

So, basically, we're calling:

    myLambda (3) + 12
    (x + a) + 12
    (3 + a) + 12
    (3 + 23) + 12 

which equals 38

**The parenthesis matter!**

To be sure we understand, let's do another example, but first set ``a = 45``. 

What should we get?

Type your guess in the next cell

Now let's try it and see

In [None]:
a=45
addTwelveToResult (aCoolLambda) # prints ???


Okay. Can we add another parameter to the function, so we can use something besides 3?

In [None]:
def addTwelveToResultB (myLambda,b):
     return myLambda (b) + 12

I'm going to pass in 5 this time. Since a = 45 still, I should get 62.

Let's check the value of **a** first.

In [None]:
a

In [None]:
addTwelveToResultB (aCoolLambda, 5)


Hopefully, we got 62.

Anytime we see ``myLambda`` we replace it with the body of the lambda 

It's kind of like a database VIEW.



* Lambdas can return many items
* Lambdas MUST return something

What does this lambda return?

In [None]:
def sumThem (myLambda):
     tot = 0
     for a in myLambda ():
          tot = tot + a
     return tot


Let's run it

In [None]:
import numpy as np

In [None]:
x = np.array([1, 2, 3, 4, 5])
iter = lambda : (j for j in x)
sumThem (iter) # prints 15


what if we want to pass in the square of each element of the array?

In [None]:
1 + 4 + 9 + 16 + 25

In [None]:
iter = lambda : (j * j for j in x)
sumThem (iter) # prints 


``sumThem`` will only work on items that are 'summable'. What happens if we try it on an array of strings?

In [None]:
x = np.array(['a', 'b', 'c'])
iter = lambda : (j for j in x)
sumThem (iter) # error?

I didn't think that would work, but now I know. 

How could we fix this?

In [None]:
def sumThem (myLambda):
     tot = ''
     for a in myLambda ():
          tot = tot + a
     return tot

In [None]:
sumThem (iter) # works?

Let's go back to the code that motivated the discussion about lambdas

In [None]:
def countWords (fileName):
     lines = sc.textFile (fileName)
     tokens = lines.flatMap (lambda line: line.split(" "))
     instances = tokens.map (lambda word: (word, 1))
     aggCounts = instances.reduceByKey (lambda a, b: a + b)
     return aggCounts.top (200, key=lambda p: p[1])


Consider just the first few lines

In [None]:
def countWords (fileName):
     lines = sc.textFile (fileName)
     tokens = lines.flatMap (lambda line: line.split(" "))


What does flatMap do?
* Processes every data item in the RDD
* Apply lambda to it
* Lambda argument will return zero or more results
* Can omit, combine or create elements
* Each result is added into the resulting RDD


Picture here of transformation of RDD that is not 1-1

Let's run the flatMap function on our text file. Note the use of the **split** function that breaks apart the line at spaces, tokenizing it. What do we exepect to get back? 

We are breaking lines of the book Moby Dick at spaces.

In [None]:
tokens = md.flatMap (lambda line: line.split(" "))

In [None]:
tokens.top(3)

The next line of our function is a call to map. 
Map:
* Processes every data item in the RDD
* Apply lambda to it
* The lambda must return exactly one result
* The returned RDD has a new element with each element replaced by the lamdba applied to the orginal term

What do we think it will do?

Picture here of transformation of RDD that is 1-1

In [None]:
instances = tokens.map (lambda word: (word, 1))

In [None]:
instances.top(3)

In [None]:
def countWords (fileName):
     lines = sc.textFile (fileName)
     tokens = lines.flatMap (lambda line: line.split(" "))
     instances = tokens.map (lambda word: (word, 1))
     aggCounts = instances.reduceByKey (lambda a, b: a + b)
     return aggCounts.top (200, key=lambda p: p[1])


In [None]:
instances.top(10)

Great! We are getting closer to our goal of getting a count of all the tokens or 'words' in our book.

Next, we want to matching tokens and sum the totals. 

We do this with the 'reduceByKey' function, which, also, takes a lambda. No surprise there. This time, our lamdba sums together values.

Our next line is a call to **reduceByKey**
* Data must be $(Key, Value)$ pairs
* Shuffle so that all $(K, V)$ pairs with same $K$ on same machine
* Organize into $(K, (V_1, V_2, ..., V_n))$ pairs
* Use the lambda to **reduce** the list to a single value

This is similar to our aggregate functions in SQL!

What do we think the result of this function will be?

In [None]:
aggCounts = instances.reduceByKey (lambda a, b: a + b)

Did it work? We might have to look at more items to be sure

In [None]:
aggCounts.top(10)

As an aside: Note that we are counting 'tokens' not words. We could have eliminated punctuation and converted everything to lowercase. That might have given us a more accurate count.  It depends on what we want to count.

Since we now know that all the pieces work, we can put them together in a function. 

Note that this function returns the top 200 tokens in the file.

Finally, we call **top** in our **countWords** function to return the top 200 words.

What do you think the most common word(s) in the book will be?

In [None]:
aggCounts.top (200, key=lambda p: p[1])

### Note:
Spark uses lazy evaluation...
If we run this code
```
lines = sc.textFile (fileName)
tokens = lines.flatMap (lambda line: line.split(" "))
instances = tokens.map (lambda word: (word, 1))
aggCounts = instances.reduceByKey (lambda a, b: a + b)
```

Nothing happens! (Other than Spark remembers the ops)
* Spark does not execute until an attempt made to collect an RDD
* When we hit **top()**, then all of these are executed
Why does Spark do this?
* By waiting until last possible second, we can  **pipeline** 
* Only operations that require a shuffle can't be pipelined

Remember our terminal pipe command: |?


Let's try out the whole function:

In [None]:
countWords('./data/Moby-Dick.txt')

These results seem reasonable. There are a lot of articles, which are common in English. Moby Dick, if you didn't know, is about a whale. So the word whale appears as well. It's also about a ship captain named Ahab. The very famous first line of the book is "Call me Ishmael." So, this book is likely told from his perspective.

Let's go through some other, useful, pySpark functions.

Let's define some new, small RDDs to use:


In [None]:
rddOne = {('red', 9), ('blue', 7), ('red', 12), ('green', 4)}
myRddOne = sc.parallelize (rddOne)
myRddOne.collect()

In [None]:
rddTwo = {('blue', 'up'), ('green', 'down'), ('green', 'behind')}
myRddTwo = sc.parallelize (rddTwo)
myRddTwo.collect()


## groupByKey()
* Data must be $(Key, Value)$ pairs
* Shuffle so that all $(K, V)$ pairs with same $K$ onto the same machine
* Organize into $(K, \langle V_1, V_2, ..., V_n \rangle)$ pairs
* Store each list as a ``ResultIterable`` for future processing
* Like ``reduceByKey()`` but without the reduce


``groupByKey`` returns pairs where the key is the key from the items in the original RDD and the value is a list of all the values for the matching keys.

To see the results, we need to ``collect`` the list of values into something we can read. One possibility is to apply a ``map`` where we return a pair with the key and a list of values.

In [None]:
myGroupBy = myRddOne.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
myGroupBy

Another useful function is a join:

In [None]:
myJoin = myRddOne.join(myRddTwo)
myJoin.collect()

Let's look at the ``reduce`` function.
 
 This function is like ``top``. It returns a result back to Python.
 
 It repeatedly applies a lambda to each item in the RDD to get a single result.
 
 Here, we use it to sum all of the items in an RDD.

In [None]:
myData = sc.parallelize (range(20000))
myData.reduce (lambda a, b: a + b)

### Aggregate

One last aggregation example, the ``aggregate`` function.

``aggregate`` takes 3 arguments: a 'zero' to initialize the aggregation and two lambdas. 

Lambda 1: takes two arguments, $X_1$ and $X_2$ and aggregates them, where $X_1$ is already aggregated, $X_2$ not.

Lambda 2: takes $X_1$, $X_2$ and aggregates them, where both are already aggregated


### Lambda 1: Define ``add`` that takes a dictionary and a tuple and returns a new dictionary that contains the staring dictionary and the new tuple

(1) initialize the result dictionary

(2) For each item in the dictionary we are given

    (3)     Create an entry in our result dictionary with dict's value

(4) If we've seen the tuple key before
    
    (5)     Accumulate the value

(6) If this is the first time we see the tuple key

     (7)     Create an entry for it and initialize with the value

(8) Return our result dictionary


In [None]:
def add (dict, tuple):
  result = {}
  for key in dict:
    result[key] = dict[key]
  if (tuple[0] in result):
    result[tuple[0]] += tuple[1]
  else:
    result[tuple[0]] = tuple[1]
  return result


### Lambda 2: Define ``combine`` that takes 2 dictionaries and returns a new dictionary that contains alls the keys in both dictionaries, with the total counts

(1) initialize the result dictionary

(2) For each item in dict1 that we are given
    
    (3) Create an entry in our result dictionary with dict1's value

(4) For each key, value in dict2 that we are given
    
    (5) If we've seen the key before
        
        (6) Accumulate the value
    
    (7) If this is the first time we see the tuple key
    
        (8) Create an entry for it and initialize with the value
(9) Return our result dictionary


In [None]:
def combine (dict1, dict2):
  result = {}
  for key in dict1:
    result[key] = dict1[key]
  for key in dict2:
    if (key in result):
      result[key] += dict2[key]
    else:
      result[key] = dict2[key]
  return result


In [None]:
myRdd = sc.parallelize ([('red', 9), ('blue', 7),  ('red', 12), ('green', 4)])


In [None]:
 myRdd.aggregate ({}, lambda x, y: add (x, y), lambda x, y: combine (x, y))

## Closing thoughts
* When is Spark/MapReduce a better option than HPC? 
    * When your pipeline is heavily data-oriented
    * When your compute is (relatively) loosely coupled
* Key benefits compared to HPC 
    * Built in fault tolerance
    * Better support for BIG data
    * Much higher programmer productivity
* Will continue to take market share from HPC
    * You see academic papers with both MPI, Spark implementations 
    * But not everything can move to Spark

### How can we use what we learned today?

### What do we know now that we didn't know before?

# Don't forget to stop your Spark context!

Copyright ©2019 Christopher M Jermaine (cmj4@rice.edu), and Risa B Myers  (rbm2@rice.edu)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.