<img src="images/spark-logo.png">

Apache Spark
============

Spark Intro
-----------

What is Spark?

- Spark is a framework for distributed processing.

- It is a streamlined alternative to Map-Reduce.

- Spark applications can be written in Python, Scala, or Java.

Why Spark
---------

Why learn Spark?

- Spark enables you to analyze petabytes of data.

- Spark skills are in high demand--<http://indeed.com/salary>.

- Spark is signficantly faster than MapReduce.

- Paradoxically, Spark's API is simpler than the MapReduce API.

Goals
-----

By the end of this lecture, you will be able to:

- Create RDDs to distribute data across a cluster

- Use the Spark shell to compose and execute Spark commands

- Use Spark to analyze stock market data

Spark Version History
---------------------

Date                Version         Changes
----                -------         -------
May 30, 2014        Spark 1.0.0     APIs stabilized 
September 11, 2014  Spark 1.1.0     New functions in MLlib, Spark SQL
December 18, 2014   Spark 1.2.0     Python Streaming API and better streaming fault tolerance
March 13, 2015      Spark 1.3.0     DataFrame API, Kafka integration in Streaming
April 17, 2015      Spark 1.3.1     Bug fixes, minor changes

Matei Zaharia
-------------

<img style="width:50%" src="images/matei.jpg">

Essense of Spark
----------------

What is the basic idea of Spark?

- Spark takes the Map-Reduce paradigm and changes it in some critical
  ways.

- Instead of writing single Map-Reduce jobs a Spark job consists of a
  series of map and reduce functions. 
  
- However, the intermediate data is kept in memory instead of being
  written to disk or written to HDFS.

Pop Quiz
--------

<details><summary>
Q: Since Spark keeps intermediate data in memory to get speed, what
does it make us give up? Where's the catch?
</summary>
1. Spark does a trade-off between memory and performance.
<br>
2. While Spark apps are faster, they also consume more memory.
<br>
3. Spark outshines Map-Reduce in iterative algorithms where the
   overhead of saving the results of each step to HDFS slows down
   Map-Reduce.
<br>
4. For non-iterative algorithms Spark is comparable to Map-Reduce.
</details>

Spark Logging
-------------

Q: How can I make Spark logging less verbose?

- By default Spark logs messages at the `INFO` level.

- Here are the steps to make it only print out warnings and errors.
    `cd $SPARK_HOME/conf`
    `cp log4j.properties.template log4j.properties`
    `sed -i.bak -e 's/rootCategory=INFO/rootCategory=ERROR/' log4j.properties`

Spark Fundamentals
==================

Spark Execution
---------------

<img src="images/spark-cluster.png">


Spark Terminology
-----------------

Term                   |Meaning
----                   |-------
Driver                 |Process that contains the Spark Context
Executor               |Process that executes one or more Spark tasks
Master                 |Process which manages applications across the cluster
                       |E.g. Spark Master
Worker                 |Process which manages executors on a particular worker node
                       |E.g. Spark Worker

Spark Job
---------

Q: Flip a coin 100 times using Python's `random()` function. What
fraction of the time do you get heads?

- Initialize Spark.

In [None]:
#only do this once. or else it will error at you! - drxt
from pyspark import SparkContext
sc = SparkContext()

- Import random.

Flip a coin 100+ xs. Do we get heads or tails?

In [None]:
import random
flips = 1000000
heads = sc.parallelize(xrange(flips)) \  #have to set sc.parallize to send it to executes
    .map(lambda i: random.random()) \
    .count()  #just add up total number of records we have to get heads

ratio = float(heads)/float(flips)

print(heads)
print(ratio)

#DRXT in lecture edits
print heads #won't be a think. have to put an action on it
print heads.count()  #this does work
print take(5) #first five element of your RDD
print heads.collect()  #all the elements in your RDD. Be careful! dangerous cause its.. all of it.

In [None]:
def is_prime(number):
    factor_min = 2
    factor_max = int(number**0.5)+1
    for factor in xrange(factor_min,factor_max):
        if number % factor == 0:
            return False
    return True

- Use this to filter out non-primes.

In [None]:
numbers = xrange(2,100)
primes = sc.parallelize(numbers)\
    .filter(is_prime)\
    .collect()
print primes

Pop Quiz
--------

<img src="images/spark-cluster.png">

<details><summary>
Q: Where does `is_prime` execute?
</summary>
On the executors.
</details>

<details><summary>
Q: Where does the RDD code execute?
</summary>
On the driver.
</details>

Transformations and Actions
===========================

Common RDD Constructors
-----------------------

Expression                               |Meaning
----------                               |-------
`sc.parallelize(list1)`                  |Create RDD of elements of list
`sc.textFile(path)`                      |Create RDD of lines from file

Common Transformations
----------------------

Expression                               |Meaning
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order


Common Actions
--------------

Expression                             |Meaning
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

Pop Quiz
--------

Q: What will this output?

In [None]:
sc.parallelize([1,3,2,2,1]).distinct().collect()  #[2,1,3]

Q: What will this output?

In [None]:
sc.parallelize([1,3,2,2,1]).sortBy(lambda x: x).collect() # [1,1,2,2,3]

#lecture example 2
sc.parallelize([('WA', 200), ('CA',200), ('CA', 100)]).sortBy(lambda x: x, ascending = False).collect()
#[('CA', 100),('CA', 200), ('WA', 200)]

Q: What will this output?

- Create this input file.

- drxt this is a better way. never want the data to show up in the driver. keep it as arms length in the executer.

In [None]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line

- What do you get when you run this code?

In [None]:
sc.textFile('input.txt') \  #RDD of lines. 
    .count()  #4
    
sc.textFile('input.txt') \  #RDD of lines. 
    .map(lambda x: x.split()) \
    .count()  #4. But now everying is an array

- What about this?

In [None]:
sc.textFile('input.txt') \
    .flatMap(lambda x: x.split()) \
    .count()
#2 + 2 + 3 + 4 = 11
#flat map 
    #if we looked at it with .coolect() it would output
    #.collect()
    #['hello', 'world', 'another', 'line',....]

Map vs FlatMap
--------------

- Here's the difference between `map` and `flatMap`.

- Map:

In [None]:
sc.textFile('input.txt') \
    .map(lambda x: x.split()) \
    .collect()

- FlatMap:

In [None]:
sc.textFile('input.txt') \
    .flatMap(lambda x: x.split()) \
    .collect()

In [None]:
#in class example 2
sc.textFile('input.txt') \
    .flatMap(lambda x: x.split()) \
    .flatMap(lambda x: x) \#interpret string as a sequence and flatten it a ittle more
    .count()
    #.collect()
    #['h', 'e', 'l', ...]

RDD Statistics
--------------

Q: How would you calculate the mean, variance, and standard deviation of a sample
produced by Python's `random()` function?

- Create an RDD and apply the statistical actions to it.

In [None]:
count = 1000
list = [random.random() for _ in xrange(count)]
rdd = sc.parallelize(list)  #parallel the list. This is distributed across executers.
#each eceduter gets a result of the mean, variance
#and then the driver puts them all together.
#so a bunch of local answers, and then the driver adds up all the answers
print rdd.mean()
print rdd.variance()
print rdd.stdev()

Pop Quiz
--------

<details><summary>
Q: What requirement does an RDD have to satisfy before you can apply
these statistical actions to it? 
</summary>
The RDD must consist of numeric elements.
</details>

<details><summary>
Q: What is the advantage of using Spark vs Numpy to calculate mean or standard deviation?
</summary>
The calculation is distributed across different machines and will be
more scalable.  drxt: you know, once your data is big. so spark gives you a way to scale it up.
</details>

RDD Laziness
------------

- Q: What is this Spark job doing?

In [None]:
max = 100000
%time sc.parallelize(xrange(max)).map(lambda x:x+1).count()
#go through all the numbers, and add 1 to each one. 
#takes about 1 second for ten million numbers. 

- Q: How is the following job different from the previous one? How
  long do you expect it to take?

In [None]:
%time sc.parallelize(xrange(max)).map(lambda x:x+1)

#so now it doesn't have a thing to do. it took 3 ms. what is going on?
#it didn't actually transform it. it just said, okay. i will do it when you ask me to. 
#will wait and do it until it is forced to give you a method


Pop Quiz
--------

<details><summary>
Q: Why did the second job complete so much faster?
</summary>
1. Because Spark is lazy. 
<br>
2. Transformations produce new RDDs and do no operations on the data.
<br>
3. Nothing happens until an action is applied to an RDD.
<br>
4. An RDD is the *recipe* for a transformation, rather than the
   *result* of the transformation.
</details>

<details><summary>
Q: What is the benefit of keeping the recipe instead of the result of
the action?
</summary>
1. It save memory.
<br>
2. It produces *resilience*. 
<br>
3. If an RDD loses data on a machine, it always knows how to recompute it.
</details>

Writing Data
------------

Besides reading data Spark and also write data out to a file system.

Q: Calculate the squares of integers from 1 to 100 and write them out
to `squares.txt`.

- Make sure `squares.txt` does not exist.

In [None]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi

- Create the RDD and then save it to `squares.txt`.

In [None]:
rdd1 = sc.parallelize(xrange(10))
rdd2 = rdd1.map(lambda x: x*x)
rdd2.saveAsTextFile('squares.txt')

- Now look at the output.

In [None]:
!cat squares.txt

- Looks like the output is a directory.

In [None]:
!ls -l squares.txt

- Lets take a look at the files.

In [None]:
!for i in squares.txt/part-*; do echo $i; cat $i; done

Pop Quiz
--------

<details><summary>
Q: What's going on? Why are there two files in the output directory?
</summary>
1. There were two threads that were processing the RDD.
<br>
2. The RDD was split up in two partitions (by default).
<br>
3. Each partition was processed in a different task.
</details>

Partitions
----------

Q: Can we control the number of partitions/tasks that Spark uses for
processing data? Solve the same problem as above but this time with 5
tasks.

- Make sure `squares.txt` does not exist.

In [None]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi

- Create the RDD and then save it to `squares.txt`.

In [None]:
partitions = 5  #instead of 2, what got last time
rdd1 = sc.parallelize(xrange(10), partitions)
rdd2 = rdd1.map(lambda x: x*x)
rdd2.saveAsTextFile('squares.txt')

- Now look at the output.

In [None]:
!ls -l squares.txt
!for i in squares.txt/part-*; do echo $i; cat $i; done

'''five files, each with two elements. The number of partitions to divide your data into 
0
1
___
4

'''

Pop Quiz
--------

<details><summary>
Q: How many partitions does Spark use by default?
</summary>
1. By default Spark uses 2 partitions.
<br>
2. If you read an HDFS file into an RDD Spark uses one partition per
   block.
<br>
3. If you read a file into an RDD from S3 or some other source Spark
   uses 1 partition per 32 MB of data.
</details>

<details><summary>
Q: If I read a file that is 200 MB into an RDD, how many partitions will that have?
</summary>
1. If the file is on HDFS that will produce 2 partitions (each is 128
   MB).
<br>
2. If the file is on S3 or some other file system it will produce 7
   partitions.
<br>
3. You can also control the number of partitions by passing in an
   additional argument into `textFile`.
</details>

Spark Terminology
-----------------

<img src="images/spark-cluster.png">

Term                   |Meaning
----                   |-------
Task                   |Single thread in an executor
Partition              |Data processed by a single task
Record                 |Records make up a partition that is processed by a single task

Notes
-----

- Every Spark application gets executors when you create a new `SparkContext`.

- You can specify how many cores to assign to each executor.

- A core is equivalent to a thread.

- The number of cores determine how many tasks can run concurrently on
  an executor.

- Each task corresponds to one partition.

Pop Quiz
--------

<details><summary>
Q: Suppose you have 2 executors, each with 2 cores--so a total of 4
cores. And you start a Spark job with 8 partitions. How many tasks
will run concurrently?
</summary>
4 tasks will execute concurrently.
</details>

<details><summary>
Q: What happens to the other partitions?
</summary>
1. The other partitions wait in queue until a task thread becomes
available.
<br>
2. Think of cores as turnstile gates at a train station, and
   partitions as people .
<br>
3. The number of turnstiles determine how many people can get through
   at once.
</details>

<details><summary>
Q: How many Spark jobs can you have in a Spark application?
</summary>
As many as you want.
</details>

<details><summary>
Q: How many Spark applications and Spark jobs are in this IPython Notebook?
</summary>
1. There is one Spark application because there is one `SparkContext`.
<br>
2. There are as many Spark jobs as we have invoked actions on RDDs.
</details>

Stock Quotes
------------

Q: Find the date on which AAPL's stock price was the highest.

Suppose you have stock market data from Yahoo! for AAPL from
<http://finance.yahoo.com/q/hp?s=AAPL+Historical+Prices>. The data is
in CSV format and has these values.

Date        |Open    |High    |Low     |Close   |Volume      |Adj Close
----        |----    |----    |---     |-----   |------      |---------
11-18-2014  |113.94  |115.69  |113.89  |115.47  |44,200,300  |115.47
11-17-2014  |114.27  |117.28  |113.30  |113.99  |46,746,700  |113.99

Here is what the CSV looks like:

In [None]:
csv = [
  "#Date,Open,High,Low,Close,Volume,Adj Close\n",
  "2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
  "2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
]

Lets find the date on which the price was the highest. 


<details><summary>
Q: What two fields do we need to extract? 
</summary>
1. *Date* and *Adj Close*.
<br>
2. We want to use *Adj Close* instead of *High* so our calculation is
   not affected by stock splits.
</details>

<details><summary>
Q: What field should we sort on?
</summary>
*Adj Close*
</details>

<details><summary>
Q: What sequence of operations would we need to perform?
</summary>
1. Use `filter` to remove the header line.
<br>
2. Use `map` to split each row into fields.
<br>
3. Use `map` to extract *Adj Close* and *Date*.
<br>
4. Use `sortBy` to sort descending on *Adj Close*.
<br>
5. Use `take(1)` to get the highest value.
</details>

- Here is full source.

In [None]:
csv = [
  "#Date,Open,High,Low,Close,Volume,Adj Close\n",
  "2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
  "2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
]
sc.parallelize(csv) \
  .filter(lambda line: not line.startswith("#")) \
  .map(lambda line: line.split(",")) \
  .map(lambda fields: (float(fields[-1]),fields[0])) \
  .sortBy(lambda (close, date): close, ascending=False)
  .take(1)

#sort sorts by all the executers on each task. 

- Here is the program for finding the high of any stock that stores
  the data in memory.

In [None]:
import urllib2
import re

def get_stock_high(symbol):
  url = 'http://real-chart.finance.yahoo.com' + \
    '/table.csv?s='+symbol+'&g=d&ignore=.csv'
  csv = urllib2.urlopen(url).read()
  csv_lines = csv.split('\n')
  stock_rdd = sc.parallelize(csv_lines) \
    .filter(lambda line: re.match(r'\d', line)) \
    .map(lambda line: line.split(",")) \
    .map(lambda fields: (float(fields[-1]),fields[0])) \
    .sortBy(lambda (close, date): close, ascending=False)
  return stock_rdd.take(1)

get_stock_high('AAPL')

#[(75, '2015-06-18')]

#worst case, you could inject boto connections
#each mappers coudl work on a subset of the data

Notes
-----

- Spark is high-level like Hive and Pig.

- At the same time it does not invent a new language.

- This allows it to leverage the ecosystem of tools that Python,
  Scala, and Java provide.

Key Value Pairs
===============

PairRDD
-------

At this point we know how to aggregate values across an RDD. If we
have an RDD containing sales transactions we can find the total
revenue across all transactions.

Q: Using the following sales data find the total revenue across all
transactions.

In [None]:
%%writefile sales.txt
#ID    Date           Store   State  Product    Amount
101    11/13/2014     100     WA     331        300.00
104    11/18/2014     700     OR     329        450.00
102    11/15/2014     203     CA     321        200.00
106    11/19/2014     202     CA     331        330.00
103    11/17/2014     101     WA     373        750.00
105    11/19/2014     202     CA     321        200.00

- Read the file.

In [None]:
sc.textFile('sales.txt')\
    .take(2)
    
#just write it incramentally, so you can immedately catch any issues. 

- Split the lines.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .take(2)

- Remove `#`.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: x[0].startswith('#'))\ #take out header
    .take(2)

- Try again.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\  #he is just showing how it goes
    .take(2)

- Pick off last field.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: x[-1])\
    .take(2)
    
#great for development, always move incramentaly towards your answer

- Convert to float and then sum.

In [None]:
#want to look the amount, which is the last feaild. so extract that out
#then apply float to numbers
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: float(x[-1]))\
    .sum()
    
#2230

ReduceByKey
-----------

Q: Calculate revenue per state?

- Instead of creating a sequence of revenue numbers we can create
  tuples of states and revenue.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .collect()
#[('WA', 300),
#    ('OR', 450), 
#    ect]

- Now use `reduceByKey` to add them up.

if tuples contain three elements, or one, it won't work. HAS to be two. -drxt


In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .collect()

Q: Find the state with the highest total revenue.

- You can either use the action `top` or the transformation `sortBy`.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda state_amount:state_amount[1],ascending=False) \
    .collect()

Pop Quiz
--------

<details><summary>
Q: What does `reduceByKey` do?
</summary>
1. It is like a reducer.
<br>
2. If the RDD is made up of key-value pairs, it combines the values
   across all tuples with the same key by using the function we pass
   to it.
<br>
3. It only works on RDDs made up of key-value pairs or 2-tuples.
</details>

Notes
-----

- `reduceByKey` only works on RDDs made up of 2-tuples.

- `reduceByKey` works as both a reducer and a combiner.

- It requires that the operation is associative.

Word Count
----------

Q: Implement word count in Spark.

- Create some input.

In [None]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line

- Count the words.

In [None]:
sc.textFile('input.txt')\
    .flatMap(lambda line: line.split())\
    .map(lambda word: (word,1))\
    .reduceByKey(lambda count1,count2: count1+count2)\
    .collect()
    
#[u('line', 3), (u'another', 5)...]

Making List Indexing Readable
-----------------------------

- While this code looks reasonable, the list indexes are cryptic and
  hard to read.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda state_amount:state_amount[1],ascending=False) \
    .collect()

- We can make this more readable using Python's argument unpacking
  feature.

Argument Unpacking
------------------

Q: Which version of `getCity` is more readable and why?

- Consider this code.

In [None]:
client = ('Dmitri','Smith','SF')

def getCity1(client):
    return client[2]

def getCity2((first,last,city)):
    return city

print getCity1(client)

print getCity2(client)

- What is the difference between `getCity1` and `getCity2`?

- Which is more readable?

- What is the essence of argument unpacking?

Pop Quiz
--------
<details><summary>
Q: Can argument unpacking work for deeper nested structures?
</summary>
Yes. It can work for arbitrarily nested tuples and lists.
</details>

<details><summary>
Q: How would you write `getCity` given 
`client = ('Dmitri','Smith',('123 Eddy','SF','CA'))`
</summary>
`def getCity((first,last,(street,city,state))): return city`
</details>

Argument Unpacking
------------------

- Lets test this out.

In [1]:
client = ('Dmitri','Smith',('123 Eddy','SF','CA'))

def getCity((first,last,(street,city,state))):
    return city

getCity(client)

'SF'

- Whenever you find yourself indexing into a tuple consider using
  argument unpacking to make it more readable.

- Here is what `getCity` looks like with tuple indexing.

In [4]:
def badGetCity(client):
    return client[2][1]

badGetCity(client)

'SF'

Argument Unpacking In Spark
---------------------------

Q: Rewrite the last Spark job using argument unpacking.

- Here is the original version of the code.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda state_amount:state_amount[1],ascending=False) \
    .collect()

- Here is the code with argument unpacking.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda (id,date,store,state,product,amount): (state,float(amount)))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda (state,amount):amount,ascending=False) \
    .collect()

- In this case because we have a long list or tuple argument unpacking
  is a judgement call.

GroupByKey
----------

`reduceByKey` lets us aggregate values using sum, max, min, and other
associative operations. But what about non-associative operations like
average? How can we calculate them?

- There are several ways to do this.

- The first approach is to change the RDD tuples so that the operation
  becomes associative. 

- Instead of `(state, amount)` use `(state, (amount, count))`.

- The second approach is to use `groupByKey`, which is like
  `reduceByKey` except it gathers together all the values in an
  iterator. 
  
- The iterator can then be reduced in a `map` step immediately after
  the `groupByKey`.

Q: Calculate the average sales per state.

- Approach 1: Restructure the tuples.

In [None]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],(float(x[-1]),1)))\
    .reduceByKey(lambda (amount1,count1),(amount2,count2): \
        (amount1+amount2, count1+count2))\
    .collect()

- Note the argument unpacking we are doing in `reduceByKey` to name
  the elements of the tuples.

- Approach 2: Use `groupByKey`.

In [None]:
def mean(iter):
    total = 0.0; count = 0
    for x in iter:
        total += x; count += 1
    return total/count

sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3],float(x[-1])))\
    .groupByKey() \
    .map(lambda (state,iter): mean(iter))\   #wrote his own mean because numpy won't work with iterators
    .collect()

- Note that we are using unpacking again.

Pop Quiz
--------

<details><summary>
Q: What would be the disadvantage of not using unpacking?
</summary>
1. We will need to drill down into the elements.
<br>
2. The code will be harder to read.
</details>

<details><summary>
Q: What are the pros and cons of `reduceByKey` vs `groupByKey`?
</summary>
1. `groupByKey` stores the values for particular key as an iterable.
<br>
2. This will take up space in memory or on disk.
<br>
3. `reduceByKey` therefore is more scalable.
<br>
4. However, `groupByKey` does not require associative reducer
   operation.
<br>
5. For this reason `groupByKey` can be easier to program with.
</details>

<!--

Find Highest Revenue State

Cache and Persist

Checkpoint

Narrow vs Wide Operations

Partitions

Broadcast Variables

Accumulators

x = sc.parallelize(xrange(10)).filter(lambda x: x % 2 == 0).collect()

x = sc.parallelize([1,2,3,4,5,5,1,3]).distinct().collect()

x = sc.parallelize([1,2,3,4,5,5,1,3]).mean()

x = sc.parallelize([1,2,3,4,5,5,1,3]).mean()




-->

###Joins
Q: Given a table of employees and locations find the cities that the employees live in.
 - The easiest way to do this is with a join.

In [None]:
# Employees: emp_id, loc_id, name
employee_data = [
    (101, 014, 'Alice'),
    (102, 015, 'Bob'),
    (103, 014, 'Chad'),
    (104, 015, 'Jen'),
    (105, 013, 'Dee') ]

# Locations: loc_id, location
location_data = [
    (014, 'SF'),
    (015, 'Seattle'),
    (016, 'Portland')]

employees = sc.parallelize(employee_data)
locations = sc.parallelize(location_data)

# Re-key employee records with loc_id
employees2 = employees.map(lambda (emp_id,loc_id,name):(loc_id,name));

# Now join.
employees2.join(locations).collect()

PopQuiz:

How do we keep employees that don't have a valid location ID in the final result?
 - Use `leftOuterJoin` to keep employees without location ID
 - `rightOuterJoin` to keep locations without employees
 - `fullOuterJoin` to keep both

##Caching and Persistence
###RDD Caching
 - Consider this Spark job

In [None]:
max = 100000
%time rdd1 = sc.parallelize(xrange(max))
%time rdd2 = rdd1.map(lambda x:x*x)

 - Until we execute an action the RDDs do nothing.
 - Now lets force execution of the RDD by calling an action on it.

In [None]:
%time c = rdd2.count()

 - Notice that Spark is not caching rdd1 or rdd2.
 - The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throw away all the data.
 - If you have an RDD that you are going to reuse in your computation you can use cache() to make Spark cache the RDD.
 
###RDD Caching
Q: Repeat the previous computation with cache enabled on the RDDs.
 - Set up the RDDs.

In [None]:
max = 100000
rdd1 = sc.parallelize(xrange(max))
rdd2 = rdd1.map(lambda x:x*x)

#enable caching on rdd1
rdd1.cache();  #if it ever has the data it will hold up on it

#Observe the performance - not much better. Still have to go through the data
%time rdd2.count()

#now cache rdd2
rdd2.cache();

#now when we run it, 
%time rdd2.count()
#4.91 sec to 1.65 seconds. 


In [None]:
#in class notes

rdd1 = sc.parallelize([random.random() for _ in xrange(1000000)])
rdd2 = rdd1.sortBy(lambda x:x)
rdd2.cache()
rdd3 = rdd2.filter(lambda x: x>0.5)

%time rdd3.count()
'''wall time: 344ms
rdd2 is caches
if run it again, it drops down to 57.6ms
'''%time rdd3.count()
'''So if use an rdd multiple times, you want to cache it
or else you will ahve to do it every time'''

###Notes
 - Calling `cache()` flips a flag on the RDD.
 - The data is not cached until an action is called.
 - You can uncache an RDD using `unpersist()`.
 
#####Pop Quiz:
Will `unpersist` uncache the RDD immediately or does it wait for an action?
 
###Caching and Persistence
Q: Persist RDD to disk instead of caching it in memory.
 - You can cache RDDs at different levels.
 - Here is an example.

In [None]:
import pyspark
rdd = sc.parallelize(xrange(100))
rdd.persist(pyspark.StorageLevel.DISK_ONLY)

everytime you use the RDD it will be read from disk. 

Will the RDD be stored on disk when you exectue it?
No. You have to call some kind of an action for the persistance to act on it. 


###Persistence Levels

-----
Level/Meaning
- `MEMORY_ONLY` Same as `cache()` 
- `MEMORY_AND_DISK` Cache in memory then overflow to disk 
- `MEMORY_AND_DISK_SER` Like above; in cache keep objects serialized instead of live  
- `DISK_ONLY` Cache to disk not to memory

Notes
----
 - `MEMORY_AND_DISK_SER` is a good compromise between the levels.
 - Fast, but not too expensive.
 - Make sure you unpersist when you don't need the RDD any more.

##Spark Performance
###Narrow and Wide Transformations
 - Spark transformations are narrow if each RDD has one unique child past the transformation.
 - Spark transformations are wide if each RDD can have multiple children past the transformation.
 - Narrow transformations are map-like, while wide transformations are reduce-like.
 - Narrow transformations are faster because they do move data between executors, while wide transformations are slower.

- drxt:join, reduced_bykey , make sure get as little data as possible because its going to go between all the comp

###Repartitioning
 - Over time partitions can get skewed.
 - Or you might have less data or more data than you started with.
 - You can rebalance your partitions using `repartition` or `coalesce`.
 - `coalesce  is narrow while repartition is wide.

drxt notes:
 - when wide, have the opportunity to ask for new partitions. 
 - Wide operations like sortByKey, have a second argument for number of partitions
 - so if reduceByKey, maybe don't need 100 partitions anymore, so you can tell it to give you less. Same with join. 
 
 ALL WIDE OPERATIONS take the number of partitions
 


In [None]:
import psypark
rdd = sc.parallelize(xrange(100))
rdd.persist(pyspark.StorageLevel.DISK_ONLY)

###Misc
####Amazon S3
 - "s3:" URLs break when Secret Key contains a slash, even if encoded https://issues.apache.org/jira/browse/HADOOP-3733
 - Spark 1.3.1 / Hadoop 2.6 prebuilt pacakge has broken S3 filesystem access https://issues.apache.org/jira/browse/SPARK-7442

###guidelines for partitions

 - if taking less than 120ms, probably too  small 
 - 30 - 100 MB of data. So if smaller, just wasting overhead.
 - If super larger, will be bogged down in tryign to process all the records
 
`coalesce()` --> good to reduce the number of partitions, too