# Introduction to PySpark
A Novice Guide and Tips to setting up Spark locally and Running PySpark on AWS Elastic Map Reduce (EMR) cluster
  
  
  
  
Tom Marthaler  
@tmarthal  
DesertPy *2017-06-28*  
https://github.com/tmarthal/notebooks/blob/master/Introduction%20to%20PySpark.ipynb


# Contents

* Introduction to Spark 
* Spark architecture, data structures and operations
* Setting up a local Spark development environment (findspark)
* Sample PySpark commands / usages
* Testing PySpark w/ Sparkling 
* Deploying PySpark applications 
* Command line interface to spin up an EMR cluster

(Hard to figure out what to talk about - the spark ecosystem is huge) 

# Introduction to Spark 

Overview ([source](http://spark.apache.org/docs/latest/))

> Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.

> It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.



# Spark Background

> Spark, leverages in-memory primitives to improve performance over MapReduce for certain applications. It is well-suited to machine learning algorithms and interactive analytics. ([source](http://www.cio.com/article/3101842/analytics/databricks-unveils-commercial-support-for-apache-spark-2-0.html))

* Originally developed at Berkeley's AMPLab in 2009.
* BSD-ed in 2010.
* Donated to Apache in 2013.
* Apache Top-Level Project in 2014.
* 1.0.0 released in May 2014.
* 2.0.0 released in May 2016
* Currently on 2.1.1 (released May 02, 2017).
* Commerical support, certifications and backing by [Databricks](databricks.com)
    - Databrick's seems to be pushing SparkSQL/DataFrames more than PySpark lately...

# PySpark Explained

Apache Spark is a cluster computing platform designed to be fast and general-purpose. PySpark interfaces with the JVM to distribute the work to the cluster machines. 

Spark’s primary abstraction is a distributed collection of items called a __Resilient Distributed Dataset__ (RDD). 


![alt text](http://i.imgur.com/YlI8AqEl.png "spark architecture")
([source](https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals))




> At its core, Spark is a “computational engine” that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster. Because the core engine of Spark is both fast and general-purpose, it powers multiple higher-level components specialized for various workloads, such as SQL or machine learning. These components are designed to inter-operate closely, letting you combine them like libraries in a software project.

> At a high level, a Spark application consists of a driver program that launches various parallel operations on a cluster. The driver program contains the main function of your application which will be then distributed to the clusters members for execution. The SparkContext object is used by the driver program to access the computing cluster. For the shell applications the SparkContext is by default available through the sc variable.

(cont.)


> A very important concept in Spark is RDD – resilient distributed data-set. This is an immutable collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDD can contain any type of object from Java, Scala, Python or R including user-defined classes. The RDDs can be created in two ways: by loading an external data-set or by distributing a collection of objects like list or sets.

- Paraphrased from Chapter 1 *__Learning Spark__*, ([source](https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch01.html))

## Spark Operations

After an RDD is created there are two types of operations:

1. __Transformations__ –  construct a new RDD from an existing one
1. __Actions__ – compute a result based on an RDD

Actions on RDDs are computed in a lazy way: given the chain of transformations, Spark can optimize the memory and compute required to calculate the result. Corollary to this, each time an action is run on an RDD, it is recomputed. Spark provides a `persist()` method to maintain an RDD for computing multiple actions. 



# Spark Programming Flow

* create input RDDs
* transform them using transformations
* persist RDDs if needed for reuse
* launch actions to start parallel computation
* save the contents of the RDD

# Spark - Usage

__Q:__ Do you need to use spark? 

__A:__ ¯\\\_(ツ)\_/¯ - but you'll most likely know when you can no longer use database for storage or single machine for processing


Spark is more of an infrastructure than an application, and if you don't have a team to support a cluster installation, you can always use AWS or GCP.

The tools exist, just have to give them a shot. 


# PySpark - Requirements

* Installation and Setup (the hardest part)
    * Zipfian's Installation instructions (OSX/Linux) for help: https://github.com/zipfian/spark-install
* AWS Account / AWS Keys (for running on Elastic Map Reduce) 


After (or during!) this talk go through the following pages:

* Spark Quick Start Guide - https://spark.apache.org/docs/latest/quick-start.html 
* Spark Programming Guide - https://spark.apache.org/docs/2.1.1/programming-guide.html 


This talk is a summary of those two guides, plus random notes of implementing a PySpark system on EMR. 


# PySpark 

Important things __not__ covered in this talk:

* RDDs versus DataFrames 
    * "DataSets" are not supported by python (only in Java/Scala)
* Batch versus Stream processing
* No RDD persisting
* Spark Shell `./bin/pyspark`
    > note: the spark context variable is injected as 'sc' by default. easy to run examples.
* No SparkSQL (using a SQL dialect for transformations, if that's your thing) - http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=udf


The spark ecosystem is *huge*

# Running Locally

After installing spark and the pyspark binaries (`pyspark` and `spark-submit`), also need to set up the `pyspark` package import path (part of the installation headaches)

```
PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. findspark does the latter.
```
Solution: use [`findspark`](https://github.com/minrk/findspark)

In [None]:
try:
    # this is going to fail
    from pyspark import SparkContext
except ImportError as e:
    print(e)

In [None]:
# Use `findspark` package for easy path setup
import findspark

findspark.init() # add pyspark module to sys.path

In [None]:
# 👌
from pyspark import SparkContext

In [None]:
# declare a context variable. when running `pyspark` from the command line,
#  this `sc` variable is injected into the REPL
# 🙅 sc = SparkContext() -- Don't construct
sc = SparkContext.getOrCreate() # always get or create

# RDD Examples
RDDs can be created from python objects, using types of file input formats or by transforming other RDDs.

## RDD from Python Objects

[parallelize(c, numSlices=None):](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.parallelize)


```
Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
```

In [None]:
# Sum the squares of the integers from 1 to 10.
# parallelize creates an RDD, map creates an RDD, sum returns a number
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) \
    .map(lambda x: x**2) \
    .sum()

## RDDs from Files  

[textFile(name, minPartitions=None, use_unicode=True)](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext.textFile)
```
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
```

(From the API  http://spark.apache.org/docs/2.1.1/api/python/pyspark.html#pyspark.SparkContext.textFile) 



In [None]:
# Download a random csv file for later
test_filename = '/opt/tmarthal/zoo-data.csv'
!curl http://mlr.cs.umass.edu/ml/machine-learning-databases/zoo/zoo.data > {test_filename}

In [None]:
# Data definition http://mlr.cs.umass.edu/ml/machine-learning-databases/zoo/zoo.names
!head -10 {test_filename}

In [None]:
# create an RDD from the local csv file with 3 partitions
textFile = sc.textFile(test_filename, 3)

Data definition http://mlr.cs.umass.edu/ml/machine-learning-databases/zoo/zoo.names

```
Attribute Information: (name of attribute and type of value domain)
   1. animal name:      Unique for each instance
   2. hair		Boolean
   3. feathers		Boolean
   4. eggs		Boolean
   5. milk		Boolean
   6. airborne		Boolean
   7. aquatic		Boolean
   8. predator		Boolean
   9. toothed		Boolean
  10. backbone		Boolean
  11. breathes		Boolean
  12. venomous		Boolean
  13. fins		Boolean
  14. legs		Numeric (set of values: {0,2,4,5,6,8})
  15. tail		Boolean
  16. domestic		Boolean
  17. catsize		Boolean
  18. type		Numeric (integer values in range [1,7])
```

### textFile RDDs

textFile() can take a list of locations, file globs and lots of other things


* sc.textFile splits the file into lines (keep line sizes reasonable) 
* ideally one value per line, e.g. json structure or feature vector
* use spark transforms to clean bad/malformed values 

In [None]:
# project gutenburg raw text (.txt) files
!ls /opt/tmarthal/example

In [None]:
# these RDDs should both be the same (didn't test the tar.gz one)
books = sc.textFile('/opt/tmarthal/example/*.txt')
books = sc.textFile("/opt/tmarthal/example/beowulf.txt,/opt/tmarthal/example/embroidery.txt")


# Can also read in compressed files and operate on them directly
#  (less logging though, since not processed line by line)
books_gz = sc.textFile("/opt/tmarthal/*.gz")

### Other Data Formats

* If you control the data format, *parquet* has been shown to improve I/O performance
* __ Spark can also read directly from S3__
* `textFile()` can also read directly from hdfs:// and other storage systems


## Operators In Use
RDDs have **actions**, which return values, and **transformations**, which return pointers to new RDDs.

There are a lot of functional operators, beyond just `map()` and `reduce()`. Check the PySpark API for the full list : http://spark.apache.org/docs/2.1.0/api/python/pyspark.html

[pyspark-pictures](http://nbviewer.jupyter.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb)
- neat conceptual view of the PySpark operators

### PySpark Transformations 

All the normal functional transforms:

* `filter` 
* `flatMap`
* `groupBy`
* `map`
* `sample`
* `sortBy`




`flatMap` visualization example : 

![flatmap](https://github.com/jkthompson/pyspark-pictures/raw/master/images/readme-example.png "flatmap")


In [None]:
# flatMap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, 100*x, x**2))
print(x.collect())
print(y.collect())

### PySpark Actions
These are methods instantiate the RDD processing. 

* count
* countApprox
* reduce
* reduceByKey
* sum

Also actions that transform the RDD into Python data types

* collect
* first
* top


In [None]:
textFile.first() # action

In [None]:
textFile.count() # action

In [None]:
# RDD -> RDD -> RDD -> RDD -> python list
animalsWithHairList = textFile \
    .map(lambda line: line.split(',')) \
    .filter(lambda line: line[1] == '1') \
    .map(lambda line: line[0]) \
    .collect() 
print(animalsWithHairList, end=" ") 

## Save RDD

* `.saveAsTextFile` persists the transformed RDD into a given location
    * Will write to local file, s3 or any other format that can be read in
    * the format of the RDD/textfile is up to the developer


* Also `saveAsHadoopFile`, `saveAsSequenceFile` and other formats. 

In [None]:
# clear the output location - spark will not overwrite contents (org.apache.hadoop.mapred.FileAlreadyExistsException)
!rm -Rf /opt/tmarthal/animals_with_hair

In [None]:
animalsWithHair = textFile \
    .map(lambda line: line.split(',')) \
    .filter(lambda line: line[1] == '1') \
    .map(lambda line: line[0])
    
# since there were 3 minimum partitions in the textfile, 3 
animalsWithHair.saveAsTextFile('/opt/tmarthal/animals_with_hair/')



In [None]:
!ls /opt/tmarthal/animals_with_hair/

In [None]:
!cat /opt/tmarthal/animals_with_hair/part-00001

# Testing PySpark Applications

Do's & Don'ts: 

* make sure to write testable python (Spark jobs tend towards single file/functions)
   - inject the spark context into a callable method (no logic in `__main__` or bare)
   - move common code to modules/packages


* encapsulate transforms into named functions, only use lamdas for simple operations
   - the hardest thing to understand about some spark code is knowing the shape of the transformed RDD
   - pyspark has dynamically typed RDDs, definitely need to test their structure/contents


* can use `unittest` just like other python codebases
   - `unittest` and `mock` the named functions directly for >95% coverage 


* `pysparking` for integration tests, creates an in-memory python `pysparkling.Context`
   - it was developed for running spark applications on small data, no JVM/Hadoop https://github.com/svenkreiss/pysparkling


In [None]:
# Testable integration with pysparkling
from pysparkling import Context

from random import random
from operator import add

def pi(context, n,  partitions):
    
    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0
    return context.parallelize(range(1, n + 1), partitions).map(f).reduce(add)


# "integration test" code
partitions = 2
n = 100000 * partitions
c = pi(Context(), n, partitions)
pi = (4.0 * c / n)

# then assert as normal in tests
assert (pi > 3 and pi < 3.2)  # self.assertGreater(pi, 3) etc.
pi

# Deploying PySpark Applications

When running a spark application from a command line, the syntax is as follows:

```
> spark-submit --master=local[4] /opt/tmarthal/wordcount-example.py /opt/tmarthal/example/beowulf.txt
                 ^^ cluster             ^^^^ script file               ^^^ argument
```


Packaging / depedency management is a huge headache - there are two ways to package dependencies 

1. Use the `--py-files` argument to the spark-submit command
   * Example: `spark-submit --py-files dependency.py,library.zip main.py argument1 argument2`
   * Does not work with compiled python libraries, e.g. numpy
   * More dynamic, but may impact import statements

   
2. Install the files into the python path on the cluster machines
   * More dev-ops/operations management than application development

# Deploying to an EMR

The `spark-submit` call in an EMR system is the same as tested locally, but there is no filesystem. All configuration and code must be read from s3 or setup directly with the EMR cluster.


The idea behind this sample analysis is to count the top 100 words in the book files in a `data` folder.

> `spark-submit -master=local wordcount.py s3n://tmarthal-spark/data/*.txt s3n://tmarthal-spark/top-words` 




For purposes of illustration, presume that the s3 bucket `tmarthal-spark` has the following files:

```
tmarthal-spark/
  - wordcount.py
  - tmarthal-1.0.1.tar.gz
  - emr-bootstrap.sh
  - requirements.txt
  - spark-env.json
  - data/
    - beowulf.txt
    - embroidery.txt
    ...
```

In [None]:
!aws s3 ls tmarthal-spark --recursive --profile tmarthal


* `wordcount.py` - a pyspark application for counting words, two arguments the location of the text files and the location to save the output words

* `tmarthal-1.0.1.tar.gz` - the source distribution of shared python code
> i.e. from `python3.4 setup.py sdist`

* ` emr-bootstrap.sh` - a bash file that is run on each cluster machine, to install the python dependencies

* `requirements.txt` - a typical pip requirements file

* `spark-env.json` - the EMR environment configuration to use python3.4 (instead of the default 2.7)



In [None]:
#!/bin/bash
# spark cluster bootstrap file to install the python requirements

# pip can't install from s3 urls, so we presign the https url for the python code in s3 and install it using the
# temporary authorized url(s)
#Note: If this url returns a 403: Forbidden error, then need to modify the bucket permissions

# The requirements
aws s3 presign tmarthal-spark/requirements-spark.txt | xargs sudo pip-3.4 install -r

# any distributed source modules
aws s3 presign tmarthal-spark/tmarthal-1.0.1.tar.gz --expires-in 30 | xargs sudo pip-3.4 install

In [None]:
boto3==1.4.4
botocore==1.5.29
docutils==0.13.1
findspark==1.1.0
jmespath==0.9.2
numpy==1.11.3
psutil==5.2.0
python-dateutil==2.6.0
s3transfer==0.1.10
scikit-learn==0.18.1
scipy==0.17.0
six==1.10.0

In [None]:
[
    {
        "Classification": "spark-env",
        "Properties": {},
        "Configurations": [
            {
                "Classification": "export",
                "Properties": {
                    "PYSPARK_PYTHON": "python34"
                },
                "Configurations": []
            }
        ]
    }
]

## EMR Cluster Creation

Example EMR aws-cli cluster creation and processing single step spark process:

```
> aws emr create-cluster \
  --name "Gutenburg Top Words" \
  --release-label emr-5.4.0 \
  --applications Name=Spark \
  --instance-type m3.xlarge \
  --instance-count 1 \
  --steps Type=spark,Name=FilterHouseholds,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,2,s3://tmarthal-spark/wordcount.py,s3n://tmarthal-spark/data/*.txt,s3n://tmarthal-spark/top-words],ActionOnFailure=TERMINATE_CLUSTER \
  --use-default-roles \
  --configurations https://s3.amazonaws.com/tmarthal-spark/spark-env.json \
  --bootstrap-action Path=s3://tmarthal-spark/emr-bootstrap.sh,Args=[] \
  --region us-east-1 \
  --log-uri 's3://tmarthal-spark/example/logs' \
  --auto-terminate
```

This will launch a cluster of 2 machines (`--num-executors`) and process all of the .txt files. 

For a full explanation of the EMR api, see http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html 

> My personal account's security groups are not setup to run this properly at the moment. :(


The `wordcount.py` that reads in the text files and outputs the words is left to the reader


...

## References

* [Main] Tim Hopper's PySpark Presentation - https://github.com/tdhopper/rta-pyspark-presentation/blob/master/slides.ipynb
* Best Practices Writing Production PySpark Jobs - https://developerzen.com/best-practices-writing-production-grade-pyspark-jobs-cb688ac4d20f
* Using Amazon Elastic Map Reduce (EMR) with Spark and Python 3.4 http://blog.thehumangeo.com/amazon-emr-spark-python3.html


## Questions / Commentary ?