<img src="images/sads-logo.jpeg" align="left" width="22%">
<img src="images/spark-logo-hd.png" align="right" width="28%">
<img src="images/python-logo-notext.png" align="right" width="19%">

<h1 align='center'>Intro to PySpark Workshop</h1>
<h3 align='center'>Meghann Agarwal<br/>September 14, 2017</h3>



### Bottlenecks and Blockers
#### CPU Bound
    - Optimize algorithms
    - Optimize compilation
    - Parallelize code
#### Storage Bound
    - Store data elsewhere and transfer to compute machine when needed
    - Spread data across multiple machines and run computations on all machines
#### Network I/O Bound
    - Decrease network transfer time
        - Compress data for faster transfer
        - Move computation to the data
    - Consider multithreading
        - Unblock code by allowing data to transfer in a separate thread
#### Disk I/O  Bound
    - Cache data in RAM for faster access

# MapReduce

## What is MapReduce?
*Material in the slides on MapReduce taken from Chapter 24: MapReduce in "Data Science from Scratch" by J. Grus*

Basic version of `MapReduce` algorithm:
1. Use a `mapper` function to turn each item into zero or more key-value
pairs.
2. Collect together all the pairs with identical keys.
3. Use a `reducer` function on each collection of grouped values to produce output values for the corresponding key.

MapReduce paper: <a href="https://research.google.com/archive/mapreduce.html" target="_blank">https://research.google.com/archive/mapreduce.html</a>

## Why MapReduce?

>"The primary benefit of MapReduce is that it allows us to distribute computations by moving the processing to the data."  
-- J. Grus, *Data Science from Scratch*  

"The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995 Message Passing Interface standard's reduce and scatter operations), but the scalability and fault-tolerance achieved for a variety of applications by optimizing the execution engine."  
-- Wikipedia

### Example: Word Count
"Data Science from Scratch", p. 289

Code: <a href="https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L1-L59" target="_blank">https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L1-L59</a>

In [1]:
# Load a document
documents = open('./data/The Jungle Book, by Rudyard Kipling.htm')
doctext = documents.readlines()
documents.close()

In [2]:
import re

def tokenize(message):
    """returns a list of distinct words contained in `message`"""
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return set(all_words)                           # remove duplicates

In [3]:
def wc_mapper(document):
    """for each word in document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)
        
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

In [4]:
from collections import defaultdict

def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)
    
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
            
    return [output
           for key, values in collector.items()
           for output in reducer(key, values)]

In [5]:
word_counts = map_reduce(doctext, wc_mapper, wc_reducer)

In [6]:
word_counts.sort(key=lambda tup: tup[1], reverse=True)

# Print the first `num_lines_to_print` word counts
num_lines_to_print = 20
[print(x) for i, x in enumerate(word_counts) if i < num_lines_to_print];

('the', 2534)
('and', 1832)
('p', 1831)
('of', 1185)
('to', 1123)
('a', 1059)
('rdquo', 1027)
('he', 914)
('ldquo', 819)
('in', 704)
('that', 650)
('his', 577)
('rsquo', 548)
('i', 534)
('was', 487)
('for', 446)
('said', 430)
('is', 417)
('with', 410)
('it', 378)


### Example: Analyzing Status Updates
"Data Science from Scratch", p. 293

Code: <a href="https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L61-L112" target="_blank">https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L61-L112</a>

### Example: Matrix Multiplier
"Data Science from Scratch", p. 294

Code: <a href="https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L115-L187" target="_blank">https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L115-L187</a>

# Hadoop

## What is Hadoop?
Hadoop is a system (framework, infrastructure) consisting of the Hadoop Distributed File System (HDFS), an API for writing a MapReduce program, etc.

- **Hadoop Common** – contains libraries and utilities needed by other Hadoop modules;
- **Hadoop Distributed File System (HDFS)** – a distributed file-system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster;
- **Hadoop YARN** – a platform responsible for managing computing resources in clusters and using them for scheduling users' applications; and
- **Hadoop MapReduce** – an implementation of the MapReduce programming model for large-scale data processing.

Hadoop Cluster = Hadoop Distributed File System (HDFS) + Yet Another Resource Negotiator (YARN)

## Why Hadoop?
History of Hadoop: <a href="https://medium.com/@markobonaci/the-history-of-hadoop-68984a11704" target="_blank">https://medium.com/@markobonaci/the-history-of-hadoop-68984a11704</a>

- [MapReduce](https://en.wikipedia.org/wiki/MapReduce) --> [Hadoop](http://hadoop.apache.org/): heavily used in business computing
- [Message Passing Interface (MPI)](https://en.wikipedia.org/wiki/Message_Passing_Interface) --> [MVAPICH](http://mvapich.cse.ohio-state.edu/): heavily used in scientific computing

- [Spark](http://spark.apache.org/): complement to Hadoop, faster for iterative applications, rich set of APIs in Scala, Python, and Java, and an interactive shell

# PySpark

## Installation

### Databricks Community Edition
1. Use Google Chrome browser (Firefox should also work, but not Internet Explorer, Safari, etc.)
2. Sign up for a free Community Edition account here: https://databricks.com/try-databricks

### Local on Mac OSX
1. Download http://apache.spinellicreations.com/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
2. Extract the Spark package and create `SPARK_HOME`
```
tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz
sudo mv spark-2.2.0-bin-hadoop2.7 /opt/spark
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH
```
3. Run the included Pi Estimator example by executing the following command:
```
$SPARK_HOME/bin/run-example SparkPi 10
```
Expect to see something like:
```
Pi is roughly 3.140576
```

### Amazon Web Services (AWS)
1. Deploy Spark on Elastic Cloud Compute (EC2) cluster
2. Deploy Spark on Elastic MapReduce (EMR) cluster

## What is Spark?
Spark is a fast and expressive cluster computing system for doing Big Data computation. It's good for:
- iterative tasks
- doing big batch processing
- interactive data exploration

It's compatible with Hadoop-supported file systems and data formats (HDFS, S3, SequenceFile, ...), so if you've been using Hadoop you can use it with your existing data and deploy it on your existing clusters.

It achieves fault tolerance through *lineage*: if you lose a partition (chunk) of data you can reconstruct it through a set of *transformations* that act on data stored in memory. This is in contrast to distributed shared memory systems where you have to write to disk and roll back.

## Why Spark?
https://spark.apache.org/

<img src="images/speed.png" align="left" width="100%">

<img src="images/ease-of-use.png" align="left" width="100%">

<img src="images/generality.png" align="left" width="100%">

<img src="images/runs-everywhere.png" align="left" width="100%">


>"Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they <font color='red'> lack abstractions for leveraging distributed memory</font>. This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations. <font color='blue'>Data reuse is common in many iterative machine learning and graph algorithms</font>, including PageRank, K-means clustering, and logistic regression. <font color='blue'>Another compelling use case is interactive data mining</font>, where a user runs multiple ad-hoc queries on the same subset of the data. Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to <font color='red'>write it to an external stable storage system</font>, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times."  
-- Zaharia et al., "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," *In NSDI '12*, April 2012

https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

## Spark Architecture
- Spark Driver and Workers
- Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).

<img src="images/cluster-overview.png" align="center" width="75%">

<a href="https://spark.apache.org/docs/latest/cluster-overview.html" target="_blank">https://spark.apache.org/docs/latest/cluster-overview.html</a>

- Standalone
- Hadoop YARN
- Apache Mesos


## Spark Programming Concepts
- ****SparkContext****: main entry point to Spark functionality
- ****SparkSession****: main entry point for DataFrame and SQL functionality
- ****Resilient Distributed Datasets (RDDs)****:
    - Immutable, distributed collections of objects
    - Can be cached in memory for fast reuse
- ****Operations on RDDs****:
    - *Transformations*: define a new RDD (map, join, ...)
    - *Actions*: return or output a result (count, save, ...)
- ****Two ways to create RDDs****:
    1. By parallelizing an existing collection in your driver program:  
        ```
        data = [1, 2, 3, 4, 5]  
        dist_data = sc.parallelize(data)
        ```
    2. Or by referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat:  
        ```
        dist_file = sc.textFile("data.txt")
        ```
<h4 align='right'>http://spark.apache.org/docs/latest/programming-guide.html</h4>

## Spark Data Interfaces

- [RDD API](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)
- [DataFrame API](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes)
- [Machine Learning API](https://spark.apache.org/docs/latest/mllib-guide.html)


****RDD (Resilient Distributed Dataset)****
-   The basic abstraction in Spark. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user.

****DataFrame****
-   The DataFrame is collection of distributed `Row` types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.

*"Introduction to Apache Spark on Databricks" notebook*

## What is PySpark?
- The Python API for Spark
- Run interactive jobs in the shell
- Supports numpy, pandas and other Python libraries

## Why use PySpark?
- If you already know Python
- Can use Spark in tandem with your favorite Python libraries
- If you don't need Python libraries, maybe just write code in Scala

### PySpark's Core Classes
- ****pyspark.SparkContext****  
Main entry point for Spark functionality.
- ****pyspark.RDD****  
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- ****pyspark.streaming.StreamingContext****  
Main entry point for Spark Streaming functionality.
- ****pyspark.streaming.DStream****  
A Discretized Stream (DStream), the basic abstraction in Spark Streaming.
- ****pyspark.sql.SQLContext****  
Main entry point for DataFrame and SQL functionality.
- ****pyspark.sql.DataFrame****  
A distributed collection of data grouped into named columns.


## Transformations
- Transform one RDD to another, **new** RDD (immutable)

| Transformation | Description | Type |
| :------:  | :-----------: | :-----: |
| `map(func)`     | Apply a function over each element | Narrow |
| `flatMap(func)` | Map then flatten output | Narrow |
| `filter(func)`  | Keep only elements where function is `True` | Narrow |
| `sample(withReplacement, fraction, seed)` | Return a sampled subset of this RDD | Narrow |
| `groupByKey(k, v)` | Group the values for each key in the RDD into a single sequence | Wide |
| `reduceByKey(func)` | Merge the values for each key using an associative reduce function | Wide |


<img src="images/narrow_wide_transformations.png" align="center" width="90%">

<h4 align='right'>https://dzone.com/articles/big-data-processing-spark</h4>

## Actions
- Return or output a result

| Action | Description | Try it Out\*|
| :------:  | :-----------:| :---: |
| `collect()`     | Return a list that contains all of the elements in this RDD | `sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()` |
| `count()`  | Return the number of elements | `sc.parallelize([2, 3, 4]).count()` |
| `saveAsTextFile(path)` | Save as a text file, using string representations of elements | `sc.parallelize(['foo', '-', 'bar', '!']).saveAsTextFile("/FileStore/foo-bar.txt")])`|
| `first()`    | Return the first element | `sc.parallelize([2, 3, 4]).first()` |
| `take(num)`    | Take the first num elements | `sc.parallelize([2, 3, 4, 5, 6]).take(2)` |

### \* Try it Out:
1. Go to your databricks Workspace and create a new directory within your Users directory called "2017-09-14-sads-pyspark" 
2. Create a notebook called "0-Introduction"  within this directory
3. Type or copy/paste lines of code into separate cells and run them (you will be prompted to launch a cluster) 

Here are some cheat sheets you might find helpful:
- <a href="https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf" target="_blank">PySpark_Cheat_Sheet_Python.pdf</a>
- <a href="https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf" target="_blank">PySpark_SQL_Cheat_Sheet_Python.pdf</a>

When using Databricks the `SparkContext` is created for you automatically as `sc`.

In the Databricks Community Edition there are no Worker Nodes - the Driver Program (Master) executes the entire code.

### More examples to try:

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect())

In [None]:
from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

Complete documentation of available *transformations* and *actions*: <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html" target="_blank">http://spark.apache.org/docs/latest/api/python/pyspark.html</a>

### Log Mining Code (Scala)

In [None]:
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))

messages.filter(_.contains("foo")).count

The computation is expressed declaratively and nothing actually takes place until calling `count` at the end.

# Exercises

## Exercise 1: Word Count
Create a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file. Or, upload "The Jungle Book, by Rudyard Kipling.htm".

1. Create a notebook in "2017-09-14-sads-pyspark" called "1-WordCount"
2. Try to implement the following Word Count example:

http://spark.apache.org/examples.html

In [None]:
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

## Exercise 2: Clickstream

***Switch over to Databricks*** Import notebook "2-Clickstream" in to "2017-09-14-sads-pyspark"

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8253322025282036/502654908621835/2929138047951629/latest.html


***Data Source: February 2015 English Wikipedia Clickstream dataset***
- https://meta.wikimedia.org/wiki/Research:Wikipedia_clickstream
- http://datahub.io/dataset/wikipedia-clickstream/resource/be85cc68-d1e6-4134-804a-fd36b94dbb82. 

>"The data contains counts of (referer, resource) pairs extracted from the request logs of English Wikipedia. When a client requests a resource by following a link or performing a search, the URI of the webpage that linked to the resource is included with the request in an HTTP header called the "referer". This data captures 22 million (referer, resource) pairs from a total of 3.2 billion requests collected during the month of February 2015."

The data is approximately 1.2GB and it is hosted in the following Databricks file: `/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed`

*Notebook translated from the Databricks "Quick Start DataFrames" tutorial in Scala, which was based on a lab developed by [Sameer Farooqui](https://www.linkedin.com/in/blueplastic).*

### Exercise 3: Logistic Regression

1. Create a notebook in "2016-09-14-sads-pyspark" called "3-LogisticRegression"
2. Try to implement one of the following Logistic Regression examples:
    - http://spark.apache.org/examples.html (Prediction with Logistic Regression)
    - https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/logistic_regression.py
    - https://github.com/apache/spark/blob/master/examples/src/main/python/logistic_regression.py

In [None]:
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

# Resources and References
- ****MOOCs****:
    - "Introduction to Apache Spark": https://www.edx.org/course/introduction-apache-spark-uc-berkeleyx-cs105x
    - "Hadoop Platform and Application Framework" (week 5 covers Spark): https://www.coursera.org/learn/hadoop/home/week/5
- ****Spark/PySpark Docs****:
    - https://spark.apache.org/docs/latest/
    - http://spark.apache.org/research.html
    - http://spark.apache.org/examples.html
- ****Other****:
    - Spark 2.0 Webinar (2016): http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin
    - PySpark Talk (J. Rosen, 2013): https://www.youtube.com/watch?v=xc7Lc8RA8wE
    - "Apache Spark in 24 Hours": https://www.amazon.com/Apache-Spark-Hours-Teach-Yourself/dp/0672338513

# Thanks for Coming!

Ways to get in touch:
- <agarwal.meghann@gmail.com>
- https://www.linkedin.com/in/meghann-agarwal-7a044b4/