<img src='https://www.rc.colorado.edu/sites/all/themes/research/logo.png'>

# Introduction to Spark

Many examples courtesy Monte Lunacek

## Landscape of Distributed Computing

How do you process 100's of GB of data?

- Filtering unstructured data
- Aggregation
- Large-scale machine learning
- Graph analysis

## Outline

- Functional programming in Python
- Spark's programming model
- As many examples as we can get through!

## Functional Python

<blockquote>
Python acquired lambda, reduce, filter and map, courtesy of a Lisp hacker who missed them and submitted working patches. -Guido van Rossum
</blockquote>

- `map` 
- `reduce`
- `filter`
- `lambda`
- And more: [itertools](https://docs.python.org/2/library/itertools.html), [pytoolz](https://github.com/pytoolz/toolz/)

We will use these concepts (and more) in `Spark`

### The `map` abstraction

In [1]:
def square(x):
    return x*x

numbers = [1,2,3]

def map_squares(nums):
    res = []
    for x in nums:
        res.append( square(x) )
    return res

or...

In [2]:
results = map(square, numbers)

For parallel computing in python, `map` is a key abstraction.

In [3]:
from multiprocessing import Pool
pool = Pool(5)
results = pool.map(square, numbers)

### `lambda`

Anonymous function: a function without a name

In [4]:
lambda_square = lambda x: x*x
map(lambda_square, range(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [5]:
map(lambda x: x*x, range(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [6]:
res = map(lambda x: x*x, range(10))
print res

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


### `reduce`

Apply a function with **two** arguments cumulatively to the container.

In [7]:
def add_num(x1, x2):
    return x1+x2

print reduce(add_num, res)

285


In [8]:
print reduce(lambda x,y: x+y, res)

285


### `filter`

Constructs a new list for items where the applied function is `True`.

In [9]:
def less_than(x):
    return x>10

filter(less_than, res)

[16, 25, 36, 49, 64, 81]

In [10]:
filter(lambda x: x>10, res)

[16, 25, 36, 49, 64, 81]

## Spark Programming Model

Everything starts with a `SparkContext`

In [1]:
import findspark
import os
findspark.init() # you need that before import pyspark.

import pyspark

In [2]:
sc = pyspark.SparkContext()

This [gist](http://nbviewer.ipython.org/gist/fperez/6384491/00-Setup-IPython-PySpark.ipynb) by Fernando Perez explains how to initialize the `CLUSTER_URL` during the startup of IPython.

- local
-  URL for a distributed cluster
    - e.g. `spark://node1239:7077`

### Create RDDs

[RDD Documentation](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

The `parallelize` method is a utility for initializing RDDs.

- Not efficient (it writes a file and reads back in).

In [3]:
import numpy as np

rdd = sc.parallelize(np.arange(20), numSlices=5)

### Transformations and Actions

**Actions** return values- beware of memory limitations!

- `collect`
- `reduce`
- `take`
- `count`

**Transformations** return edges to new vertex in DAG

- `map`, `flatmap`
- `reduceByKey`
- `filter`
- `glom`

What does this look like?

- `glom`: Returns an RDD list from each partition of an RDD.
- `collect`: Returns a list from all elements of an RDD.

In [4]:
for x in rdd.glom().collect():
    print x

[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14, 15]
[16, 17, 18, 19]


In [5]:
rdd = sc.parallelize(np.arange(20), numSlices=10)
for x in rdd.glom().collect():
    print x

[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
[16, 17]
[18, 19]


### `map` and `Flatmap`

Return a new RDD by first applying a function and then flattening the results.

In [6]:
rdd = sc.parallelize([ [2, 3, 4],[0, 1],[5, 6, 7, 8] ])
rdd.collect()

[[2, 3, 4], [0, 1], [5, 6, 7, 8]]

In [7]:
rdd.map(lambda x: range(len(x))).collect()

[[0, 1, 2], [0, 1], [0, 1, 2, 3]]

Or I can flatten the results...

In [8]:
rdd.flatMap(lambda x: range(len(x))).collect()

[0, 1, 2, 0, 1, 0, 1, 2, 3]

Or flatten the original results

In [9]:
rdd.flatMap(lambda x: x).collect()

[2, 3, 4, 0, 1, 5, 6, 7, 8]

### Reduction

In [10]:
rdd.flatMap(lambda x: x).reduce(lambda x,y: x+y)

36

In [11]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.collect()

[('a', 1), ('b', 1), ('a', 2)]

In [12]:
rdd.reduceByKey(lambda x,y: x+y).collect()

[('a', 3), ('b', 1)]

In [13]:
rdd = sc.parallelize([("hamlet", 1), ("claudius", 1), ("hamlet", 1)])

In [14]:
rdd.countByKey()

defaultdict(int, {'claudius': 1, 'hamlet': 2})

### Reading HDF5 with PySpark

In [19]:
import h5py as h5

In [22]:
h5file_path='../data/hdf5_ex.h5'

def readchunk(v):
    empty = h5.File(h5file_path, 'r')
    return empty['/chunked'][v,:]

chunks = sc.parallelize(range(0,10)).map(lambda v: readchunk(v))
chunks.take(2)

[array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
 array([10, 11, 12, 13, 14, 15, 16, 17, 18, 19])]