<img src=images/widsatx-logo-outline.png align="left" width="18%">

<h1 align='center'>MapReduce, etc.</h1>
<h3 align='center'>June 15, 2017</h3>
<h3 align='center'>Women in Data Science - ATX</h3>
<h3 align='center'>Data Science from Scratch Workshop</h3>

## Question: What sorts of computational blockers have you come across?

## Some Solutions
#### CPU Bound
    - Optimize algorithms
    - Parallelize code
#### I/O Bound
    - Decrease network transfer time
        - Compress data for faster transfer
        - Move computation to the data
    - Consider multithreading
        - Unblock code by allowing data to transfer in a separate thread
    - Cache data for faster access
    
#### Disk Capacity
    - Store data elsewhere and transfer chunk(s) to compute machine when needed
    - Spread data across multiple machines and run computations on all machines
#### Memory Capacity
    - Read chunks into available memory or get more memory


# Overview
- **MapReduce:** What and Why
    - Examples:
        - Word Count
        - Analyzing Status Updates
        - Matrix Multiplier
    - References and Resources
- **Hadoop:** What and Why
    - Example:
        - Word Count
- **(Py)Spark:** What and Why
    - Examples:
        - Word Count
        - Logistic Regression
        - Clickstream
    - References and Resources

# MapReduce: What and Why

## What is MapReduce?

(Material in the slides on MapReduce was taken from Chapter 24: MapReduce in "Data Science from Scratch" by J. Grus)

Basic version of `MapReduce` algorithm:
1. User a `mapper` function to turn each item into zero or more key-value
pairs.
2. Collect together all the pairs with identical keys.
3. User a `reducer` function on each collection of grouped values to produce output values for the corresponding key.

MapReduce paper: https://research.google.com/archive/mapreduce.html

## Why MapReduce?

"The primary benefit of MapReduce is that it allows us to distribute computations by moving the processing to the data."
-- J. Grus

### Example: Word Count
"Data Science from Scratch", p. 289

Code: https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L1-L59

In [1]:
# Load a document
documents = open('./input/The Jungle Book, by Rudyard Kipling.htm')
doctext = documents.readlines()
documents.close()

In [2]:
import re

def tokenize(message):
    """returns a list of distinct words contained in `message`"""
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return set(all_words)                           # remove duplicates

In [3]:
def wc_mapper(document):
    """for each word in document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)
        
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

In [6]:
from collections import defaultdict

def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)
    
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
            
    return [output
           for key, values in collector.items()
           for output in reducer(key, values)]

In [10]:
word_counts = map_reduce(doctext, wc_mapper, wc_reducer)

In [11]:
word_counts.sort(key=lambda tup: tup[1], reverse=True) 
[print(x) for x in word_counts]

('the', 2534)
('and', 1832)
('p', 1831)
('of', 1185)
('to', 1123)
('a', 1059)
('rdquo', 1027)
('he', 914)
('ldquo', 819)
('in', 704)
('that', 650)
('his', 577)
('rsquo', 548)
('i', 534)
('was', 487)
('for', 446)
('said', 430)
('is', 417)
('with', 410)
('it', 378)
('s', 374)
('on', 355)
('they', 344)
('all', 338)
('but', 335)
('him', 329)
('as', 327)
('at', 298)
('you', 297)
('mdash', 295)
('not', 291)
('had', 269)
('up', 259)
('have', 228)
('little', 228)
('or', 225)
('mowgli', 215)
('when', 210)
('out', 203)
('be', 199)
('there', 193)
('by', 191)
('are', 181)
('down', 181)
('we', 178)
('my', 175)
('then', 172)
('one', 168)
('will', 168)
('this', 167)
('were', 166)
('from', 164)
('man', 163)
('would', 156)
('me', 152)
('no', 148)
('what', 147)
('if', 146)
('jungle', 146)
('their', 142)
('very', 138)
('who', 137)
('head', 135)
('them', 134)
('into', 132)
('so', 128)
('bagheera', 128)
('do', 125)
('could', 125)
('back', 120)
('go', 113)
('can', 113)
('where', 112)
('never', 111)
('big', 

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

### Example: Analyzing Status Updates
"Data Science from Scratch", p. 293

Code: https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L61-L112

### Example: Matrix Multiplier
"Data Science from Scratch", p. 294

Code: https://github.com/joelgrus/data-science-from-scratch/blob/master/code/mapreduce.py#L115-L187

# Resources and References (MapReduce)
- "Data Science From Scratch" - J. Grus

# Hadoop: What and Why

## What is Hadoop?
Hadoop is a system (framework, infrastructure) consisting of the Hadoop Distributed File System (HDFS), an API for writing a MapReduce program, etc.

## Why Hadoop?
History of Hadoop: https://medium.com/@markobonaci/the-history-of-hadoop-68984a11704

### Some Other Tools
- MapReduce Systems:
    - Hadoop (http://hadoop.apache.org) + your own cluster
    - Amazon Web Serives (AWS) Elastic MapReduce (EMR) (http://aws.amazon.com/elasticmapreduce/)

- "Real-time" Analytics Systems:
    - Spark (http://spark.apache.org/)
    - Storm (http://storm.incubator.apache.org/)

### Example: Word Count (Hadoop Streaming)

https://github.com/jddavis-100/Statistics-and-Machine-Learning/blob/gh-pages/MapReduce_Word_Count

# Spark (PySpark)

# Environment Setup
We'll use the free Databricks Comunity Edition platform to run our Spark jobs: 
1. Use Google Chrome browser (Firefox should also work, but not Internet Explorer, Safari, etc.)
2. Sign up for the Community Edition account here: https://databricks.com/try-databricks

Or, feel free to use a local installation of Spark, etc. 

If Spark isn't already installed on the machine you're working on you can download and build the source code (or use a pre-built version):
1. Download: http://spark.apache.org/downloads.html
2. Open: `/path/to/spark2.1.1/README.md`
3. Build: `/path/to/spark2.1.1/build/mvn -DskipTests clean package`

# Spark: What and Why

## What is Spark?
Spark is a fast and expressive cluster computing system for doing Big Data computation. It's good for iterative tasks, for doing big batch processing, and for interactive data exploration.

It's compatible with Hadoop-supported file systems and data formats (HDFS, S3, SequenceFile, ...), so if you've been using Hadoop you can use it with your existing data and deploy it on your existing clusters.

It achieves fault tolerance through *lineage*: if you lose a partition (chunk) of data you can reconstruct it through a set of *transformations* that act on data stored in memory. This is in contrast to distributed shared memory systems where you have to write to disk and roll back.

## Why use Spark?
- Speed
- Ease of Use
- Generality
- Runs Everywhere


>"Although current frameworks provide numerous abstractions for accessing a cluster’s computational resources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse intermediate results across multiple computations. Data reuse is common in many <font color='blue'>iterative</font> machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is <font color='blue'>interactive</font> data mining, where a user runs multiple ad-hoc queries on the same subset of the data. Unfortunately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times."

- Zaharia et al., "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing," *In NSDI '12*, April 2012

## Spark vs MapReduce vs MPI vs ...
- [MapReduce](https://en.wikipedia.org/wiki/MapReduce) --> [Hadoop](http://hadoop.apache.org/): heavily used in business computing
- [Message Passing Interface (MPI)](https://en.wikipedia.org/wiki/Message_Passing_Interface) --> [MVAPICH](http://mvapich.cse.ohio-state.edu/): heavily used in scientific computing
- [Spark](http://spark.apache.org/): complement to Hadoop, faster for iterative applications, rich set of APIs in Scala, Python, and Java, and an interactive shell

"The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995 Message Passing Interface standard's reduce and scatter operations), but the scalability and fault-tolerance achieved for a variety of applications by optimizing the execution engine."
-- Wikipedia

## Spark Architecture
- Spark Driver and Workers
- SparkContext (replaced by SparkSession in version 2.X)
- Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
- SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN)

<img src=images/cluster-overview.png align="center" width="75%">

<h4 align='right'>https://spark.apache.org/docs/1.1.0/cluster-overview.html</h4>

## Spark Programming Concepts (version 1.X)
- ****SparkContext****: entry point to Spark functions
- ****Resilient Distributed Datasets (RDDs)****:
    - Immutable, distributed collections of objects
    - Can be cached in memory for fast reuse
- ****Operations on RDDs****:
    - *Transformations*: define a new RDD (map, join, ...)
    - *Actions*: return or output a result (count, save, ...)
- ****Two ways to create RDDs****:
    1. By parallelizing an existing collection in your driver program:  
        `data = [1, 2, 3, 4, 5]  
        distData = sc.parallelize(data)`  
    2. Or by referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat:  
        `distFile = sc.textFile("data.txt")`       
<h4 align='right'>http://spark.apache.org/docs/latest/programming-guide.html</h4>

## Spark Data Interfaces (versions 1.X and 2.X)

There are several key interfaces that you should understand when you go to use Spark.

-   ****The Dataset****
    -   The Dataset is Apache Spark's newest distributed collection and can be considered a combination of DataFrames and RDDs. It provides the typed interface that is available in RDDs while providing a lot of conveniences of DataFrames. It will be the core abstraction going forward.
-   ****The DataFrame****
    -   The DataFrame is collection of distributed `Row` types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.
-   ****The RDD (Resilient Distributed Dataset)****
    -   Apache Spark's first abstraction was the RDD or Resilient Distributed Dataset. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user. While this is the original data structure made available, new users should focus on Datasets as those will be supersets of the current RDD functionality.

*(slide taken from "Introduction to Apache Spark on Databricks" notebook)*

## What is PySpark?
- The Python API for Spark
- Run interactive jobs in the shell
- Supports `numpy`, `pandas` and other Python libraries

## Why use PySpark?
- If you already know Python
- Can use Spark in tandem with your favorite Python libraries
- If you don't need Python libraries, maybe just write code in Scala

### PySpark's core classes:
- ****pyspark.SparkContext****  
Main entry point for Spark functionality.
- ****pyspark.RDD****  
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- ****pyspark.streaming.StreamingContext****  
Main entry point for Spark Streaming functionality.
- ****pyspark.streaming.DStream****  
A Discretized Stream (DStream), the basic abstraction in Spark Streaming.
- ****pyspark.sql.SQLContext****  
Main entry point for DataFrame and SQL functionality.
- ****pyspark.sql.DataFrame****  
A distributed collection of data grouped into named columns.

http://spark.apache.org/docs/latest/api/python/

## Transformations
- Transform one RDD to another, **new** RDD (immutable)

| Transformation | Description | Type |
| :------:  | :-----------: | :-----: |
| `map(func)`     | Apply a function over each element | Narrow |
| `flatMap(func)` | Map then flatten output | Narrow |
| `filter(func)`  | Keep only elements where function is `True` | Narrow |
| `sample(withReplacement, fraction, seed)` | Return a sampled subset of this RDD | Narrow |
| `groupByKey(k, v)` | Group the values for each key in the RDD into a single sequence | Wide |
| `reduceByKey(func)` | Merge the values for each key using an associative reduce function | Wide |


<img src=images/narrow_wide_transformations.png align="center" width="100%">

<h4 align='right'>https://dzone.com/articles/big-data-processing-spark</h4>

## Actions
- Return or output a result

| Action | Description | Try it Out\*|
| :------:  | :-----------:| :---: |
| `collect()`     | Return a list that contains all of the elements in this RDD | `sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()` |
| `count()`  | Return the number of elements | `sc.parallelize([2, 3, 4]).count()` |
| `saveAsTextFile(path)` | Save as a text file, using string representations of elements | `sc.parallelize(['foo', '-', 'bar', '!']).saveAsTextFile("/FileStore/foo-bar.txt")])`|
| `first()`    | Return the first element | `sc.parallelize([2, 3, 4]).first()` |
| `take(num)`    | Take the first num elements | `sc.parallelize([2, 3, 4, 5, 6]).take(2)` |

### \* Try it Out:
1. Go to your Databricks Workspace and create a new directory within your Users directory called "2016-07-15-widsatx" 
2. Create a notebook called "0-Introduction"  within this directory
3. Type or copy/paste lines of code into separate cells and run them (you will be prompted to launch a cluster) 

**Notes:**

When using Databricks the `SparkContext` is created for you automatically as `sc`.

In the Databricks Community Edition there are no Worker Nodes - the Driver Program (Master) executes the entire code.

Docs: https://docs.databricks.com/

### Try a couple more examples with transformations and actions:

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect())

In [None]:
from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())

We've shown only a subset of possible *transformations* and *actions*. Check out others for your application in the docs: http://spark.apache.org/docs/latest/api/python/pyspark.html

### Example: Log Mining (in Scala)

In [None]:
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))

messages.filter(_.contains("foo")).count

The computation is expressed declaratively and nothing actually takes place until calling `count` at the end.

# Exercises

### Exercise 1: Word Count
Create a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

1. Create a notebook in "2017-06-18-widsatx" called "1-WordCount"
2. Upload the file "The Jungle Book, by Rudyard Kipling.htm" to DBFS (https://docs.databricks.com/user-guide/importing-data.html)
2. Try to implement the following Word Count example:

http://spark.apache.org/examples.html

In [None]:
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

### Exercise 2: Logistic Regression

1. Create a notebook in "2016-06-15-widsatx" called "2-LogisticRegression"
2. Try to implement one of the following Logistic Regression examples:
    - http://spark.apache.org/examples.html (Prediction with Logistic Regression)
    - https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/logistic_regression.py
    - https://github.com/apache/spark/blob/master/examples/src/main/python/logistic_regression.py

In [None]:
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

### Exercise 3: Clickstream

1. Create a notebook in "2017-06-15-widsatx" called "3-Clickstream"
2. Implement the Clickstream example from the Databricks "Quick Start DataFrames" notebook in Python
3. Or just run through the example in Scala

# Resources and References (Spark)
- ****MOOCs****:
    - "Introduction to Apache Spark": https://www.edx.org/course/introduction-apache-spark-uc-berkeleyx-cs105x
    - "Hadoop Platform and Application Framework" (week 5 covers Spark): https://www.coursera.org/learn/hadoop/home/week/5
- ****Spark/PySpark Docs****:
    - (v1.6.1 - latest) http://spark.apache.org/docs/latest/
        - http://spark.apache.org/docs/latest/api/python/
    - (v2.0.0) http://spark.apache.org/docs/2.0.0-preview/
    - http://spark.apache.org/research.html
    - http://spark.apache.org/examples.html
- ****Other****:
    - Spark 2.0 Webinar (2016): http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin
    - PySpark Talk (J. Rosen, 2013): https://www.youtube.com/watch?v=xc7Lc8RA8wE

# Thanks for Coming!