In [2]:
%matplotlib inline
import matplotlib
import seaborn as sns
matplotlib.rcParams['savefig.dpi'] = 2 * matplotlib.rcParams['savefig.dpi']

# PySpark Intro
Like _Hadoop_, __Spark__ is a low-level system for distributed computation on a cluster.  It has two major advantages:
 
 - It can do in-memory caching between stages, while Hadoop must write everything to disk.  This improves performance.  It also makes it suitable for classes of algorithms that would otherwise be too slow (e.g. iterative algorithms, like the training step in many machine learning algorithms).
 
 - It has a more flexible execution model (i.e. not just MapReduce).
 
There are also minor advantages: The default API is much nicer than writing raw MR code. The favored interface is via a language called Scala, but there is also good support for using Python.
## How does Spark relate to Hadoop?  

__Answer 1__: It's a replacement for it.  You can manage and run your own Spark cluster, independent of Hadoop.  You'll have to figure out the filesystem layer from scratch though.   (In this context, _Spark SQL_ is the replacement for _Hive_, i.e. for SQL-like operation.)

__Answer 2__: It's a complement to it.  You can run Spark on top of a Hadoop cluster, and still leverage HDFS and YARN -- then Spark is just replacing MapReduce.  (In this context, _Spark SQL_ can be used as a drop-in replacement for _Hive_.)

## The Spark API
### Nouns
The two main abstractions in Spark are:
1. Resilient distributed datasets (RDDs):
  This is like a (smartly) distributed list, which is partitioned across the nodes of the cluster and can be operated on in parallel.  The operations take the form of the usual functional list operations as shown above.  There are two types of operations: Transformations and Actions.
1. Shared variables used in parallel operations: When Spark runs a function in parallel across nodes, it ships a copy of each variable needed by that function to each node.  There are two types of shared variables:
 - Broadcast variables: These are used to store a value in memory across all nodes (i.e. their values are "broadcast" from the driver node)
 - Accumulator variables: These are variables which are only added to across nodes, for implementing counters and sums.
 
### Verbs
- **Transformations:** This creates a new RDD from an old one: for instance `map` or `flatMap`.  Transformations in Spark are __lazy__, which means that they do not compute the answer right away -- only when it is needed by something else.  Instead they represent the "idea" of the transformation applied to the base data set (e.g. for each chaining).  In particular, this means that intermediate results to computations do not necessarily get created and stored -- if the output of your map is going straight into a reduce, Spark should be clever enough to realize this and not actually store the output of the map.  Another consequence of this design is that the results of transformations are, by default, recomputed each time they are needed -- to store them one must explicitly call `cache`.
    
    [Transformations](https://spark.apache.org/docs/latest/programming-guide.html#transformations) typically *transform* the data and return another RDD.  Common examples include `map`, `filter`, `flatMap`, and also `join`, `cogroup`, `groupByKey`, `reduceByKey`, `countByKey`, and `sample`.

- **Actions:** These actually return a value as a result of a computation.  For instance, `reduce` is an action that combines all elements of an RDD using some function and then returns the final result.  (Note that `reduceByKey` actually returns an RDD.)
    
    [Actions](https://spark.apache.org/docs/latest/programming-guide.html#actions) typically either generate *small* outputs (e.g. `reduce`, `count`, `first`, `take`, `takeSample`, `foreach`, `collect`) or persist to disk (e.g. `saveAsTextFile`, `saveAsSequenceFile`, etc.).


**Questions:**
1. What's the difference between `map` and `foreach` in (non-Spark) Scala?  Why is `map` a transformation but `foreach` an action in Spark?
1. Why is `reduceByKey` a transformation but `reduce` an action?

In [3]:
import random, re, time
from datetime import datetime

## Creating a Spark Context

In [4]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")
print sc.version

1.5.1


## Spark examples

In [5]:
import os
def localpath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

In [None]:
# Basic wordcount
lines = sc.textFile(localpath('small_data/gutenberg/'))
totalLines = lines.count()
print "total lines: %d" % totalLines

lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word.lower(), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortByKey() \
    .saveAsTextFile("/tmp/output_gutenberg" + str(time.time()))

**Exercise:** There are a lot of special symbols in the output.  How would you do wordcount only on pure characters?  Hint: in Python, you can test for regular expressions this way:

In [None]:
matches = re.search("^\\w+$", "abc")
matches.group(0)

### Example: a simple simulation for $\pi$

In [None]:
import random
numSamples = 100000

def generate_point(self):
    x = random.random()
    y = random.random()
    if x * x + y * y < 1:
        return 1
    else:
        return 0

count = sc.parallelize(xrange(1, numSamples + 1)) \
    .map(generate_point) \
    .reduce(lambda x, y: x + y)
    
print "Pi is roughly " + str(4.0 * count / numSamples)

## ETL in Spark

#### Premise
The first steps of any data science project usually involve ETL'ing and performing exploratory analytics on a dataset.
Here we'll acquaint ourselves both with the Spark shell and some Spark functions that we can use to perform such tasks.

#### The dataset
From https://archive.ics.uci.edu/ml/datasets/EEG+Database:  
> "The first four lines are header information. Line 1 contains the subject identifier and indicates if the subject was an alcoholic (a) or control (c) subject by the fourth letter. Line 4 identifies the matching conditions: a single object shown (S1 obj), object 2 shown in a matching condition (S2 match), and object 2 shown in a non matching condition (S2 nomatch). Line 5 identifies the start of the data from sensor FP1. The four columns of data are: the trial number, sensor position, sample number (0-255), and sensor value (in micro volts)."

There are 16,452 rows in this file including the header. Here's a preview of the first 10 lines and last 10 lines:
```
# co2a0000364.rd
# 120 trials, 64 chans, 416 samples 368 post_stim samples
# 3.906000 msecs uV
# S1 obj , trial 0
# FP1 chan 0
0 FP1 0 -8.921
0 FP1 1 -8.433
0 FP1 2 -2.574
0 FP1 3 5.239
0 FP1 4 11.587
    ...
0 Y 246 24.150
0 Y 247 20.243
0 Y 248 11.454
0 Y 249 4.618
0 Y 250 3.153
0 Y 251 6.571
0 Y 252 12.431
0 Y 253 15.849
0 Y 254 16.337
0 Y 255 14.872
```


#### Summary stats on the dataset

1. There is an action for `RDD[Double]` called `stats` that will provide us with summary statistics about the values in the RDD. How can we get out summary stats of the `reading` column across the entire dataset?

1. How can we do the same but for only the `posn` "FP1"?

1. Let's make sure there are 256 samples per `posn` in this dataset. How can we do this? 

In [6]:
def isHeader(line):
    return "# " in line

class Record(object):
    def __init__(self, trial, posn, sample, reading):
        self.trial = trial
        self.posn = posn
        self.sample = sample
        self.reading = reading

def parse(line):
    tokens = line.split()
    trial = int(tokens[0])
    posn = tokens[1]
    sample = int(tokens[2])
    reading = float(tokens[3])
    return Record(trial, posn, sample, reading)

data = sc.textFile(localpath("small_data/eeg/*")) \
    .filter(lambda x: not isHeader(x)) \
    .map(parse)

In [7]:
print data.map(lambda x: x.reading).stats()

(count: 491520, mean: -0.0793460876465, stdev: 6.19831990956, max: 48.472, min: -49.072)


In [None]:
print data.filter(lambda x: x.posn == "FP1").map(lambda x: x.reading).stats()

In [None]:
samples = data.map(lambda x: ((x.trial, x.posn), 1)).reduceByKey(lambda x, y: x + y)
posns = samples.map(lambda (x, y): (x[1], y)).distinct()

assert posns.take(1)[0][1] == 256
assert posns.count() == posns.filter(lambda (x, y): y == 256).count()

*Question:* How do you make sure the sample numbers are all correct?

## Joins in Spark

Like in MapReduce, a join is accomplished by emitting key-value pairs `(k, v)` and joining on similar keys.  It is accomplished by roughly the same mechanism (sending similar keys to the same *node*), although in Spark those key-value pairs may not (necessarily) be persisted to disk.

In [None]:
class Transaction(object):
    def __init__(self, transactionId, productId, userId, amount):
        self.transactionId = transactionId
        self.productId = productId
        self.userId = userId
        self.amount = int(amount)

class User(object):
    def __init__(self, userId, email, language, country):
        self.userId = userId
        self.email = email
        self.language = language
        self.country = country

def construct_user(line):
    data = line.split(",")
    return User(data[1], data[2], data[3], data[4])

def construct_transaction(line):
    data = line.split(",")
    if data[0] == "sales":
        return Transaction(data[1], data[2], data[3], data[4])
    else:
        return None

users = sc.textFile(localpath("small_data/employee/users.csv")) \
    .map(construct_user)

transactions = sc.textFile(localpath("small_data/employee/sales.csv")) \
    .map(construct_transaction)

totalSales = transactions.filter(lambda x: x is not None).map(lambda x: x.amount).sum()
usersByCountryCount = users.map(lambda u: (u.country, u)).countByKey()

print "total sales %f" % totalSales
print "Users by country"
for key, val in usersByCountryCount.items():
    print key, val

# This is (userId, (transaction, user))
salesByCountry = transactions.filter(lambda x: x is not None) \
                .map(lambda t: (t.userId, t.amount)) \
                .join(users.map(lambda u: (u.userId, u.country))) \
                .map(lambda (id, (amount, country)): (country, amount)) \
                .reduceByKey(lambda x, y: x + y) \
                .collect()

print "Transactions by country"
for country in salesByCountry:
    print country[0], country[1]

### A warning about traversables

The transformations `mapPartitions`, `groupByKey`, `mapPartitionsWithIndex`, and `cogroup` return iterators.  Much like Python iterators, you can only traverse them once.  Here's a common anti-pattern:

In [None]:
# Anti-example

def mean(iterable):
    _size = len(iterable)
    _sum = float(sum(iterable))
    return _sum / _size

listMean = mean(range(1,6))
print "list mean: %f" % listMean

# iterMean = mean(iter(range(1, 6)))
# print "iterator mean: %f" % iterMean

### RDD Persistence
When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster.

The argument passed to the persist method determines how the RDD will be stored. In general, you may choose
* Deserialized or serialized formats (serialized Java objects are more space-efficient but more CPU-intensive to read)
* In-memory only or allow spilling to disk
* *Fast* fault tolerance via redundancy
* [Full documentation](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)


*Serialization*: the process of translating data structures or object states into a format (i.e. series of bits) that can be stored and reconstructed later. Note that in Python, object storage is always handled by Pickle, so there's no distinction between serialized or deserialized.

1. The cache method is a shortcut for the default behavior: in memory and deserialized:
```python
myRdd.cache()
```
2. Otherwise, specify the desired behavior:
```python
myRdd.persist(MEMORY_ONLY_SER)
```
3. Cleaning up cached data is important for memory management. During shuffles, some intermediate data is automatically persisted, so it can be worthwhile to manually unpersist RDDs instead of waiting for garbage collection:
```python
myRDD.unpersist()
```
**Question:** Caching is a key tool for iterative algorithms and fast interactive use.  Why might you not always do it?

In [None]:
def randomStream():
    return sc.parallelize(xrange(1,10000001)).map(lambda x: random.random())

uncachedLines = randomStream()
cachedLines = randomStream().cache()

In [None]:
%timeit -n1 -r1 add = uncachedLines.reduce(lambda x, y: x + y)
%timeit -n1 -r1 sub = uncachedLines.reduce(lambda x, y: x - y)
%timeit -n1 -r1 mult = uncachedLines.reduce(lambda x, y: x * y)

In [None]:
%timeit -n1 -r1 add = cachedLines.reduce(lambda x, y: x + y)
%timeit -n1 -r1 sub = cachedLines.reduce(lambda x, y: x - y)
%timeit -n1 -r1 mult = cachedLines.reduce(lambda x, y: x * y)

## Accumulators

We'll want to implement counters (like we had in `mrjob` and Hadoop in general).  As usual, here is a first anti-example:

In [None]:
# Anti-example

class Counter(object):
    def __init__(self):
        self.counter = 0
        pass

    def increment(self, amt=1):
        self.counter += amt
        pass

counter = Counter()

sc.parallelize(xrange(1,11)).foreach(lambda x: counter.increment(1))

print counter.counter  # what's happening here?

Here's the correct way to implement this using [Accumulators](https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.Accumulator).  We are just using them for integer addition but you can use them for any [monoid](https://en.wikipedia.org/wiki/Monoid).  You'll hear about these a lot in Scala.

In [None]:
# Example

accumCounter = sc.accumulator(0)

sc.parallelize(xrange(1,11)).foreach(lambda x: accumCounter.add(1))

print accumCounter

In [None]:
# Lazy example

def update(x):
    accumCounter.add(1)
    return x

accumCounter = sc.accumulator(0)

myRdd = sc.parallelize(xrange(1, 11)).map(update)

print "counter: ", accumCounter

print myRdd.reduce(lambda x, y: x + y)

print "counter: ", accumCounter

myRdd.take(1)

print "counter: ", accumCounter

## Broadcast variables

Broad variables are the Spark equivalent of Hadoop's distributed cache: they deposit read-only data cached on each machine rather than shipping a copy of it with tasks.

In [None]:
# Basic usage:

iBV = sc.broadcast(10)
assert(iBV.value == 10)

Here's a more realistic example to show how you might implement this:

In [None]:
class Sale(object):
    def __init__(self, state, sales):
        self.state = state
        self.sales = sales
        
def constructSale(tup):
    return Sale(tup[0], tup[1])

In [None]:
states = {
    0: "AL",
    1: "KY",
    2: "VT",
}

statesBV = sc.broadcast(states)

data = sc.parallelize([(0, 30.), (1, 40.), (2, 20.), (2, 10.)]).map(constructSale)

In [None]:
%%timeit -r1 -n100
# This works but it will send `states` to each node again if run twice
results1 = data.map(lambda s: (states[s.state], s.sales)).collect()

In [None]:
%%timeit -r1 -n100
# broadcast to each node once, and can be used freely inside map
results2 = data.map(lambda s: (statesBV.value[s.state], s.sales)).collect()

In [None]:
results1 = data.map(lambda s: (states[s.state], s.sales)).collect()
results2 = data.map(lambda s: (statesBV.value[s.state], s.sales)).collect()
assert(results1 == results2)

**Two caveats:**
1. After the broadcast variable is created, it should be used instead of the value so that it is not shipped to the nodes more than once.
2. The broadcast variable should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable.
3. How does the memory footprint of broadcasting scale with the number of tasks? With the number of executors?

**Question:** The above example is called a map-side join.  
1. Why is it called this?
1. What are the strengths and weaknesses of this versus a full join (i.e. emitting key-value pairs to common keys)?

In [None]:
sc.stop()

*Copyright &copy; 2015 The Data Incubator.  All rights reserved.*