# Apache Spark

![Logo](images/apache_spark_logo.png)

- [Apache Spark](https://spark.apache.org) was first released in 2014. 
- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.
- Spark is written in [Scala](https://www.scala-lang.org).

- Apache Spark is a fast and general-purpose cluster computing system. 
- It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
- Spark can manage "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.
- It also supports a rich set of higher-level tools including [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) for SQL and structured data processing, [MLlib](https://spark.apache.org/docs/latest/ml-guide.html) for machine learning, [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for graph processing, and Spark Streaming.

# Resilient distributed datasets

- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).
- RDDs behave a bit like Python collections (e.g. lists).
- When working with Apache Spark we iteratively apply functions to every item of these collections in parallel to produce *new* RDDs.
- The data is distributed across nodes in a cluster of computers.
- Functions implemented in Spark can work in parallel across elements of the collection.
- The  Spark framework allocates data and processing to different nodes, without any intervention from the programmer.
- RDDs automatically rebuilt on machine failure.


# Lifecycle of a Spark Program
1. Create some input RDDs from external data or parallelize a collection in your driver program.
2. Lazily transform them to define new RDDs using transformations like `filter()` or `map()`
3. Ask Spark to cache() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and collect() to kick off a parallel computation, which is then optimized and executed by Spark.

# Operations on Distributed Data
- Two types of operations: **transformations** and **actions**
- Transformations are *lazy* (not computed immediately) 
- Transformations are executed when an action is run


# [Transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) (lazy)
```spark
map() flatMap()
filter() 
mapPartitions() mapPartitionsWithIndex() 
sample()
union() intersection() distinct()
groupBy() groupByKey()
reduceBy() reduceByKey()
sortBy() sortByKey()
join()
cogroup()
cartesian()
pipe()
coalesce()
repartition()
partitionBy()
...
```

# [Actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

```
reduce()
collect()
count()
first()
take()
takeSample()
saveToCassandra()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
```

# PySpark


PySpark uses Py4J that enables Python programs to dynamically access Java objects.

![PySpark Internals](http://i.imgur.com/YlI8AqEl.png)

## The `SparkContext` class

- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.SparkContext` context.

- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.

- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;
    - normally we would create an RDD from a large file or an HBase table. 

## First example

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. [findspark](https://github.com/minrk/findspark) does the latter.

In [1]:
import findspark, pyspark

findspark.init(spark_home="/opt/spark-2.3.1-bin-hadoop2.7/")

sc = pyspark.SparkContext(master="local[8]", appName="FirstExample")

In [12]:
# If tou get an error run this cell with the command below commented out
# and fix the path to spark and/or python in the cell above
sc.stop()

We have a spark context sc to use with a tiny local spark cluster with 2 nodes (will work just fine on a multicore machine).

In [2]:
print(sc) # it is like a Pool Processor executor

<SparkContext master=local[8] appName=FirstExample>


# Create your first RDD

In [3]:
rdd = sc.parallelize(list(range(8))) # create collection

In [4]:
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184

In [5]:
rdd.collect()  # Gather results back to local process

[0, 1, 2, 3, 4, 5, 6, 7]

In [6]:
square = rdd.map(lambda x: x ** 2) # Square each element

In [7]:
square.collect() # Square each element and collect results

[0, 1, 4, 9, 16, 25, 36, 49]

# Map-Reduce operation 

In [8]:
from operator import add
rdd.map(lambda x: x ** 2).reduce(add) # reduce is an action!

140

# filter

In [9]:
# Select only the even elements
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6]

In [10]:
# Cartesian product of each pair of elements in two sequences 
# (or the same sequence in this case)
rdd.cartesian(rdd).collect()

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (0, 5),
 (0, 6),
 (0, 7),
 (1, 0),
 (1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (1, 7),
 (2, 0),
 (2, 1),
 (2, 2),
 (2, 3),
 (2, 4),
 (2, 5),
 (2, 6),
 (2, 7),
 (3, 0),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 5),
 (3, 6),
 (3, 7),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (4, 5),
 (4, 6),
 (4, 7),
 (5, 0),
 (5, 1),
 (5, 2),
 (5, 3),
 (5, 4),
 (5, 5),
 (5, 6),
 (5, 7),
 (6, 0),
 (6, 1),
 (6, 2),
 (6, 3),
 (6, 4),
 (6, 5),
 (6, 6),
 (6, 7),
 (7, 0),
 (7, 1),
 (7, 2),
 (7, 3),
 (7, 4),
 (7, 5),
 (7, 6),
 (7, 7)]

In [11]:
# Chain operations to construct more complex computations
(rdd.map(lambda x: x ** 2)
    .cartesian(rdd)
    .filter(lambda tup: tup[0] % 2 == 0)
    .collect())

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (0, 4),
 (0, 5),
 (0, 6),
 (0, 7),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 3),
 (4, 4),
 (4, 5),
 (4, 6),
 (4, 7),
 (16, 0),
 (16, 1),
 (16, 2),
 (16, 3),
 (16, 4),
 (16, 5),
 (16, 6),
 (16, 7),
 (36, 0),
 (36, 1),
 (36, 2),
 (36, 3),
 (36, 4),
 (36, 5),
 (36, 6),
 (36, 7)]

In [12]:
# Stop the local spark cluster
sc.stop()

### Exercise 9.1 Word-count in Apache Spark

- Write the sample text file

In [13]:
from lorem import text
with open('sample.txt','w') as f:
    f.write(text())


- Create the rdd with `SparkContext.textFile method`
- lower, remove dots and split using `rdd.flatMap`
- use `rdd.map` to create the list of key/value pair (word, 1)
- `rdd.reduceByKey` to get all occurences
- `rdd.takeOrdered`to get sorted frequencies of words

All documentation is available [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext) for textFile and [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.RDD) for RDD. 

For a global overview see the Transformations section of the [programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

# SparkSession

Since SPARK 2.0.0,  SparkSession provides a single point 
of entry to interact with Spark functionality and
allows programming Spark with DataFrame and Dataset APIs. 

### $\pi$ computation example

- We can estimate an approximate value for $\pi$ using the following Monte-Carlo method:

1.    Inscribe a circle in a square
2.    Randomly generate points in the square
3.    Determine the number of points in the square that are also in the circle
4.    Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$.
    
- Note that the more points generated, the better the approximation

See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI).

In [1]:
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

spark = (SparkSession.builder.master("local[4]")
         .appName("PythonPi")
         .config("spark.executor.instances", "4")
         .getOrCreate())

partitions = 8
n = 1000000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

Pi is roughly 3.142976


### Exercise 9.2

Using the same method than the PI computation example, compute the integral
$$
I = \int_0^1 \exp(-x^2) dx
$$
You can check your result with numpy

In [2]:
# numpy evaluates solution using numeric computation. 
# It uses discrete values of the function
import numpy as np
x = np.linspace(0,1,1000)
np.trapz(np.exp(-x*x),x)

0.7468240713763741

In [3]:
# numpy and scipy evaluates solution using numeric computation. It uses discrete values
# of the function
import numpy as np
from scipy.integrate import quad
quad(lambda x: np.exp(-x*x), 0, 1)
# note: the solution returned is complex 

(0.7468241328124271, 8.291413475940725e-15)

### Correlation between daily stock

- Data preparation

In [19]:
from glob import glob
import os, json
import pandas as pd

filenames = sorted(glob(os.path.join('..','data','daily-stock', '*.json')))
filenames

['../data/daily-stock/aet.json',
 '../data/daily-stock/afl.json',
 '../data/daily-stock/aig.json',
 '../data/daily-stock/al.json',
 '../data/daily-stock/amgn.json',
 '../data/daily-stock/avy.json',
 '../data/daily-stock/b.json',
 '../data/daily-stock/bwa.json',
 '../data/daily-stock/ge.json',
 '../data/daily-stock/hal.json',
 '../data/daily-stock/hp.json',
 '../data/daily-stock/hpq.json',
 '../data/daily-stock/ibm.json',
 '../data/daily-stock/jbl.json',
 '../data/daily-stock/jpm.json',
 '../data/daily-stock/luv.json',
 '../data/daily-stock/met.json',
 '../data/daily-stock/pcg.json',
 '../data/daily-stock/tgt.json',
 '../data/daily-stock/usb.json',
 '../data/daily-stock/xom.json']

In [20]:
for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')
    print("Finished : %s" % out_filename.split(os.path.sep)[-1])

filenames = sorted(glob(os.path.join('..','data', 'daily-stock', '*.h5')))  # ../data/json/*.json
filenames

Finished : aet.h5
Finished : afl.h5
Finished : aig.h5
Finished : al.h5
Finished : amgn.h5
Finished : avy.h5
Finished : b.h5
Finished : bwa.h5
Finished : ge.h5
Finished : hal.h5
Finished : hp.h5
Finished : hpq.h5
Finished : ibm.h5
Finished : jbl.h5
Finished : jpm.h5
Finished : luv.h5
Finished : met.h5
Finished : pcg.h5
Finished : tgt.h5
Finished : usb.h5
Finished : xom.h5


['../data/daily-stock/aet.h5',
 '../data/daily-stock/afl.h5',
 '../data/daily-stock/aig.h5',
 '../data/daily-stock/al.h5',
 '../data/daily-stock/amgn.h5',
 '../data/daily-stock/avy.h5',
 '../data/daily-stock/b.h5',
 '../data/daily-stock/bwa.h5',
 '../data/daily-stock/ge.h5',
 '../data/daily-stock/hal.h5',
 '../data/daily-stock/hp.h5',
 '../data/daily-stock/hpq.h5',
 '../data/daily-stock/ibm.h5',
 '../data/daily-stock/jbl.h5',
 '../data/daily-stock/jpm.h5',
 '../data/daily-stock/luv.h5',
 '../data/daily-stock/met.h5',
 '../data/daily-stock/pcg.h5',
 '../data/daily-stock/tgt.h5',
 '../data/daily-stock/usb.h5',
 '../data/daily-stock/xom.h5']

### Sequential code

In [21]:
%%time

series = []
for fn in filenames:   # Simple map over filenames
    series.append(pd.read_hdf(fn)['close'])

results = []

for a in series:    # Doubly nested loop over the same collection
    for b in series:  
        if not (a == b).all():     # Filter out comparisons of the same series 
            results.append(a.corr(b))  # Apply function

result = max(results)

CPU times: user 4.35 s, sys: 121 ms, total: 4.47 s
Wall time: 1.72 s


### Exercise 9.3

Parallelize the code above with Apache Spark.

Computation time is slower because there is a lot of setup, workers creation, there is a lot of communications the correlation function is too small

### Exercise 9.4 Fasta file example

We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.
First we will define some functions to split the bases into sequences of a certain size:

- Create a rdd from fasta file nucleotide-sample.txt in data directory
- Use GroupBy to process every sequence present in the file.
- Transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example.
- Notice that if we simply `map` each line of text, we will obtain multi-dimensional data. 
- Use `flatMap` to flatten this data in order to turn it into a list of base-sequences. 
- Compute frequencies by using RDD methods `map`, `flatMap`, `reduceByKey`, `sortByKey` and `take`.
- Rank each sequence according to its count key, (first element) of each tuple should be the count. Reverse the tuples.
- Now we can sort the RDD in descending order of key: