# Spark Tutorial

Author: Matthew K. MacLeod


### Tutorial goals:

* background
* configuration
* introduction
  * transformations
  * joins
  * accumulators
  * dataframes
* HQL
* machine learning
* streaming

## Background

Spark is an open source project created in 2009 by the UC Berkeley RAD lab.

Spark is a distributed framework which uses the MapReduce paradigm in memory and much more.. 

http://spark.apache.org/docs/latest/api/python/pyspark.html

Allows for
* interactive queries (Spark SQL and Hive)
* stream processing
* data analytics (MLlib)
* graph processing (GraphX)


### Directed Acyclic Graph Scheduler

Spark uses a DAG scheduler in order figure out how to execute the data analysis pipeline.

DAGs help to make a dependency flow for the transformations..allows 
lineage and for recovery of lost data partitions.

RDDs (spark objects, more on these below) are nodes and directed edges are are the transformations to create a execution graph.

This DAG scheduler also allows for parallelization optimization.


### Caching

Another important aspect to understanding Spark is caching. Since in- memory operations set Spark apart and is important to understand.

in general 
    
   * 10x to 100x speed ups typical
    
   * caching is gradual
    
   * fault tolerant (like rest of Spark)
    
   * want to cache the cleaned data set into memory
    
   * heavy calculations might want to use both memory and disk

## Configuration

python prereqs:

    pip install py4j

download and install spark with hive,
    
    cd spark-1.6.0
    
    ./build/sbt -Phive assembly
    
I do this to build with beeline as well:

     ./build/sbt -Phive -Phive-thirftserver clean assembly/assembly

set environmental variables:

    export SPARK_HOME="$HOME/programs/spark/spark-1.6.0"
    
    export PYSPARK_SUBMIT_ARGS="--master local[4] pyspark-shell"


In [1]:
# double check env 
!echo $SPARK_HOME

/home/matej/programs/spark/spark-1.6.0


we will load configuration in notebook..this way don't need to configure profile

    ipython notebook

In [2]:
import os
import sys

In [3]:
# spark configuration
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

filename = os.path.join(spark_home, 'python/pyspark/shell.py')
exec(compile(open(filename, "rb").read(), filename, 'exec'))

spark_release_file = spark_home + "/RELEASE"

if os.path.exists(spark_release_file) and "Spark 1.6" in open(spark_release_file).read():
    pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
    if not "pyspark-shell" in pyspark_submit_args: 
        pyspark_submit_args += " pyspark-shell"
        os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 3.4.3 (default, Oct 19 2015 21:52:17)
SparkContext available as sc, HiveContext available as sqlContext.


### small note on setup,

it may have been quicker to simply use:


     IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark
    
to start the jupyter notebook, but I want to run with my own env.

# Introduction to Spark

  * starting spark, intialization
  * RDD transformations
  * RDD actions

we will run spark interactively in jupyter, 

normally a pyspark script can be run

    bin/spark-submit ps.py

In [4]:
from pyspark import SparkContext

In [5]:
# the standard spark 'hello world' example:
spark_home = os.environ.get('SPARK_HOME')

text_file = sc.textFile(spark_home + "/README.md")

word_counts = text_file \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)

word_counts.take(5)

[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('Scala,', 1)]

**collect** Return a list that contains all of the elements in this RDD.

###  RDDs

An RDD (_resilient distributed dataset_) is an **immutable** distributed data container... like a collection of objects. These have distributed partitions across nodes and clusters.

Resilency is realized by tracking partitions and re-running partition history.

RDDs can be created from many sources, including local text, Amazon S3, JSON, HDFS, HBase, Cassandra and other sources.

RDD operations consist of transformations and actions.

**transformations** in spark-speak, operate on RDDs and *return new RDDs*. These are evaluated **lazily**.  eg map() and filter() are transformations.

Due to the immutablity of the RDDs, functional style of transforming the data is essential.


### one pair RDD  Transformations

* map
* filter
* reduceByKey
* groupByKey
* combineByKey
* mapValues
* flatMapValues
* keys
* values
* sortBy
* sortByKey
* sample(withReplacement, fraction ,seed)
* coalesce(numPartitions)    to reduce number of partitions


### Locality

Another important distinction to recognize is if the transformation is local (on one node eg) or not. These are referred to as narrow or wide transformations. 

** Narrow transformations ** (no transfer over network)
* map 
* flatMap
* filter
* colalesce (generally local)

** Wide transformations ** (can be expensive, involve shuffles)
* groupByKey
* repartition
* distinct
* subtract
* intersection


### NB 

    reduceByKey() and foldByKey() 
          
will automatically perform combining _locally_ on each machine before computing global totals for each key.  
        
    combineByKey() 
  
allows to customize combining behavior, use this instead of groupByKey.


### two pair RDD  Transformations
* subtractByKey
* join
* rightOuterJoin
* leftOuterJoin


In [5]:
lines = sc.textFile(spark_home +"/README.md")
lines.count()

98

In [7]:
pythonlines = lines.filter(lambda line: "Python" in line)
pythonlines.first()

'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

In [8]:
pairs = lines.map(lambda x: (x.split(" ")[0], x))

In [9]:
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)

In [10]:
rdd = sc.textFile(spark_home +"/README.md")
words = rdd.flatMap(lambda x: x.split(" "))

In [11]:
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.take(5)

[('', 67), ('guide,', 1), ('APIs', 1), ('name', 1), ('It', 2)]

**parallelize** Distribute a local Python collection to form a partitioned
RDD. Using xrange is recommended if the input represents a range for performance.

In [12]:
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x + y)      # Default parallelism
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10)  # Custom parallelism

PythonRDD[28] at RDD at PythonRDD.scala:43

In [13]:
#rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))

In [14]:
# the 2 at the end means we split the list into 2 partitions
rdd = sc.parallelize([(1, 2), (3, 6), (3, 4)],3)
rdd.groupByKey()
rdd.collect()

[(1, 2), (3, 6), (3, 4)]

In [15]:
# to see partitions use glom, useful also for debugging
rdd.glom().collect()

[[(1, 2)], [(3, 6)], [(3, 4)]]

In [16]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)],2)
rdd.reduceByKey(lambda x, y: x + y)
rdd.collect()

[(1, 2), (3, 4), (3, 6)]

In [17]:
print(rdd.keys().collect())
print(rdd.values().collect())
print(rdd.sortByKey().collect())

[1, 3, 3]
[2, 4, 6]
[(1, 2), (3, 4), (3, 6)]


### flatMap() vs map()

    map: produce one to one input and output. note this is a completely  **local** (narrow in spark-speak) operation. 

    flatMap: produce multiple output elements for each input element, different sizes of partitions may be involved.


In [18]:
lines = sc.parallelize(["this is the first line", "hello second line", "third guy"])
words = lines.flatMap(lambda line: line.split(" "))
words.collect()

['this',
 'is',
 'the',
 'first',
 'line',
 'hello',
 'second',
 'line',
 'third',
 'guy']

In [19]:
# can also use functions..set up word count key value pairs
def split_words(line):
    return line.split()

def create_pair(word):
    return (word,1)

#equivalent to reduce.(lambda x,y: x+y)
def sum_counts(a,b):
    return a + b

def starts_with_vowel(pair):
    vowels = ["a","e","i","o","u"]
    word = pair[0]
    first_letter = word[0]
    return first_letter.lower() in vowels

In [20]:
pairs_rdd = text_file.flatMap(split_words).map(create_pair)
pairs_rdd.take(5)

[('#', 1), ('Apache', 1), ('Spark', 1), ('Spark', 1), ('is', 1)]

In [21]:
# reduce example
wordcounts_rdd = pairs_rdd.reduceByKey(sum_counts)
wordcounts_rdd.take(5)

[('guide,', 1), ('APIs', 1), ('optimized', 1), ('name', 1), ('Scala,', 1)]

In [22]:
# filter
vs = pairs_rdd.filter(starts_with_vowel).reduceByKey(sum_counts)
vs.take(5)

[('APIs', 1), ('Once', 1), ('only', 1), ('overview', 1), ('examples', 2)]

In [23]:
# group by key example
pairs_rdd.groupByKey().take(5)

[('guide,', <pyspark.resultiterable.ResultIterable at 0x7fb2190fcbe0>),
 ('APIs', <pyspark.resultiterable.ResultIterable at 0x7fb2190fcda0>),
 ('optimized', <pyspark.resultiterable.ResultIterable at 0x7fb2190fce80>),
 ('name', <pyspark.resultiterable.ResultIterable at 0x7fb2190fce10>),
 ('Scala,', <pyspark.resultiterable.ResultIterable at 0x7fb2190fcf60>)]

In [24]:
# distinct

In [25]:
# sample with replacement


In [26]:
# sample without replacement


In [27]:
# coalesce example
sc.parallelize(range(10),4).coalesce(2).glom().collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

###  RDD Actions

**actions** in spark-speak, are computations on RDDs. Action return non-RDD objects and values. Actually doing something with the data here, typically reside at the end of data analysis pipelines,
eg, reduce. More examples:

* take(),
* top(),
* first(), 
* count()
* reduce()
* aggregate()
* fold()
* collect()

In [28]:
rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)],2)
rdd.countByKey()

defaultdict(int, {1: 1, 3: 2})

#### numeric RDD operations
* count()
* mean()
* sum()
* max()
* min()
* variance()
* sampleVariance()
* stdev()
* sampleStdev()

# Advanced Spark

**joins**  basic joins are simple in spark

** broadcast variables** may be useful for large configuration, lookup tables

**accumulators**  accumulate a variable across the cluster, concurrently write into a variable




###  Join example

Here we will combine two data sets and find the total number of views on each station (channel).

In [6]:
channels = sc.textFile("./data/channels.txt")
channels.take(10)

['Hourly_Sports,DEF',
 'Baked_News,BAT',
 'PostModern_Talking,XYZ',
 'Loud_News,CNO',
 'Almost_Show,ABC',
 'Hot_Talking,DEF',
 'Dumb_Show,BAT',
 'Surreal_Show,XYZ',
 'Cold_Talking,CNO',
 'Hourly_Cooking,ABC']

In [30]:
show_channel = channels.map(lambda x: x.split(","))
show_channel.take(5)

[['Hourly_Sports', 'DEF'],
 ['Baked_News', 'BAT'],
 ['PostModern_Talking', 'XYZ'],
 ['Loud_News', 'CNO'],
 ['Almost_Show', 'ABC']]

In [31]:
views = sc.textFile("./data/views.txt")
views.take(10)

['Hourly_Sports,21',
 'PostModern_Show,38',
 'Surreal_News,73',
 'Dumb_Cooking,144',
 'Cold_Talking,287',
 'Almost_Talking,574',
 'Loud_News,113',
 'Hot_Talking,228',
 'Baked_Games,459',
 'Hourly_Talking,922']

In [32]:
show_view = views.map(lambda x: x.split(","))
show_view.take(5)

[['Hourly_Sports', '21'],
 ['PostModern_Show', '38'],
 ['Surreal_News', '73'],
 ['Dumb_Cooking', '144'],
 ['Cold_Talking', '287']]

In [33]:
# spark will join on common key, here the show
joined_data = show_view.join(show_channel)
joined_data.take(5)

[('Hot_Show', ('631', 'ABC')),
 ('Hot_Show', ('631', 'XYZ')),
 ('Hot_Show', ('120', 'ABC')),
 ('Hot_Show', ('120', 'XYZ')),
 ('Hot_Show', ('477', 'ABC'))]

In [34]:
# if not interested in the show, use this filter
def extract_channel_views(show_views_channel):
    ''' returns an array of views and channel '''
    show, channel_views = show_views_channel[0], show_views_channel[1]
    channel, views = channel_views[1], channel_views[0]
    return (channel, views)

In [35]:
channel_views = joined_data.map(extract_channel_views)
channel_views.take(5)

[('ABC', '631'),
 ('XYZ', '631'),
 ('ABC', '120'),
 ('XYZ', '120'),
 ('ABC', '477')]

In [36]:
# side note: swaps are easy with map
views_channel = channel_views.map(lambda x: (x[1], x[0]))
views_channel.take(5)

[('631', 'ABC'),
 ('631', 'XYZ'),
 ('120', 'ABC'),
 ('120', 'XYZ'),
 ('477', 'ABC')]

In [37]:
# collect shows by key and sum views for each station 
channel_views.reduceByKey(lambda x,y: int(x) + int(y)).collect()

[('ABC', 192745),
 ('XYZ', 224945),
 ('CNO', 233893),
 ('DEF', 252827),
 ('BAT', 195990)]

### Accumulators

In [38]:
def test_accum(x): accum.add(x)

In [None]:
# test the famous Gauss summation of 1 to 100
accum = sc.accumulator(0)
sc.parallelize(range(1,101)).foreach(test_accum)
accum.value

# Spark DataFrames

These are great for real-world data science.

Note that PySpark DataFrames are processed natively, so they are just as fast as Scala or Java equivalents!

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame



In [7]:
sqlCtx = sqlContext

In [8]:
# test --should return: DataFrame[_1: string, _2: bigint]
sqlCtx.createDataFrame([("key", 1)])

DataFrame[_1: string, _2: bigint]

#### Why data frames are useful

first illustrate the alternative without using dataframes

some data:

https://en.wikipedia.org/wiki/Working_time

https://stats.oecd.org/Index.aspx?DataSetCode=ANHRS


In [9]:
# small sampling from
# Average usual weekly hours worked on the main job, 2014
# tabular data, but not a data frame:
hours = sc.parallelize([
        [8, "Poland",39.9],
        [1,"Mexico",44.7],
        [34, "France",36.1],
        [4, "Greece",38.8],
        [17,"United States",38.6],
        [38, "Germany",34.5]
    ])

In [15]:
def extract_hours(row):
    return row[2]

def extract_country_weekly(row):
    return (row[1], row[2])

In [11]:
hours.map(extract_hours).mean()

38.766666666666666

In [12]:
country_weekly_RDD = hours.map(extract_country_weekly)
country_weekly_RDD.collect()

[('Poland', 39.9),
 ('Mexico', 44.7),
 ('France', 36.1),
 ('Greece', 38.8),
 ('United States', 38.6),
 ('Germany', 34.5)]

In [13]:
# sort the results 
country_weekly_RDD.takeOrdered(10, key=lambda x: -x[1])

[('Mexico', 44.7),
 ('Poland', 39.9),
 ('Greece', 38.8),
 ('United States', 38.6),
 ('France', 36.1),
 ('Germany', 34.5)]

In [14]:
# or this way
country_weekly_RDD.sortBy(lambda x: -x[1]).collect()

[('Mexico', 44.7),
 ('Poland', 39.9),
 ('Greece', 38.8),
 ('United States', 38.6),
 ('France', 36.1),
 ('Germany', 34.5)]

#### Now do the same via dataframes

should be a little less tedious

In [26]:
from pyspark.sql.types import StructType

In [27]:
hours_schema = StructType([
        StructField("rank", LongType(), True),
        StructField("name", StringType(), True),
        StructField("hours", LongType(), True),
    ])

In [28]:
hours_df = sqlContext.createDataFrame(hours,hours_schema)
hours_df.printSchema()

root
 |-- rank: long (nullable = true)
 |-- name: string (nullable = true)
 |-- hours: long (nullable = true)



In [29]:
hours_df.show()

+----+-------------+-----+
|rank|         name|hours|
+----+-------------+-----+
|   8|       Poland| null|
|   1|       Mexico| null|
|  34|       France| null|
|   4|       Greece| null|
|  17|United States| null|
|  38|      Germany| null|
+----+-------------+-----+



# Spark SQL

For reference purposes,

http://spark.apache.org/docs/latest/sql-programming-guide.html#sql


note that spark comes with beeline, if compiled properly

    ./build/sbt -Phive-thirftserver clean assembly/assembly

activate via:

    spark$ ./bin/beeline -u jdbc:hive2://
    
use standard beeline hive interface.

In [8]:
print(spark_home)

/home/matej/programs/spark/spark-1.6.0


In [9]:
# check example from docs
df = sqlContext.read.json(spark_home + "/examples/src/main/resources/people.json")

In [10]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [16]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



# Machine Learning in Spark:  MLlib


reference: 

http://spark.apache.org/docs/latest/mllib-guide.html



In [None]:
from pyspark.mllib.feature import HashingTF

In [None]:
sentence = "hello there world of spark"
words = sentence.split()
tf = HashingTF(1000)
tf.transform(words)

# Streaming in Spark

note see file mkm_notebooks/license.txt for license of this notebook.