# 3.1 Introduction to frameworks

# Apache Spark

## What is Spark?

- Spark is a framework for distributed processing.
- It is a streamlined alternative to Map-Reduce.
- Spark applications can be written in Scala, Java, or Python.

## Why Spark?

Why learn Spark?

- Spark enables you to analyze petabytes of data.
- Spark is significantly faster than Map-Reduce.
- Paradoxically, Spark's API is simpler than the Map-Reduce API.

## Origins

- Spark was initially started at UC Berkeley's AMPLab (AMP = Algorithms Machines People) in 2009.
- After being open sourced in 2010 under a BSD license, the project was donated in 2013 to the Apache Software Foundation and switched its license to Apache 2.0.
- Spark is one of the most active projects in the Apache Software Foundation and one of the most active open source big data projects.

## Essense of Spark

What is the basic idea of Spark?
- Spark takes the Map-Reduce paradigm and changes it in some critical ways:
  - Instead of writing single Map-Reduce jobs, a Spark job consists of a series of map and reduce functions.
  - Moreover, the intermediate data is kept in memory instead of being written to disk.

## Spark Ecosystem

<img src='assets/spark_ecosystem.png'>

### Pop Quiz

<details>
<summary>Q: Since Spark keeps intermediate data in memory to get speed, what does it make us give up? Where's the catch?</summary>
1. Spark does a trade-off between memory and performance.
<br>
2. While Spark jobs are faster, they also consume more memory.
<br>
3. Spark outshines Map-Reduce in iterative algorithms where the overhead of saving the results of each step to HDFS slows down Map-Reduce.
<br>
4. For non-iterative algorithms Spark is comparable to Map-Reduce.
</details>

## Spark Logging

Q: How can I make Spark logging less verbose?
- By default Spark logs messages at the `INFO` level.
- Here are the steps to make it only print out warnings and errors.

```sh
cd $SPARK_HOME/conf
cp log4j.properties.template log4j.properties
```

- Edit `log4j.properties` and replace `rootCategory=INFO` with `rootCategory=ERROR`

## Spark Execution

<img src='assets/spark_execution.png'>

## Spark Terminology
-----------------

Term | Meaning
--- |---
Driver | Process that contains the Spark Context
Executor | Process that executes one or more Spark tasks
Master | Process which manages applications across the cluster, e.g., Spark Master
Worker | Process which manages executors on a particular worker node, e.g. Spark Worker

## Spark Job

Q: Flip a coin 100 times using Python's `random()` function. What
fraction of the time do you get heads?

- Initialize Spark.

In [1]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
    .master('local[4]') \
    .appName('spark-lecture') \
    .getOrCreate()

sc = spark.sparkContext

- Define and run the Spark job.

In [2]:
import random 

n = 1000


heads = (sc.parallelize(range(n))
    .map(lambda _: random.random())
    .filter(lambda r: r <= 0.5)
    .count())

tails = n - heads
ratio = 1. * heads / n

print('heads =', heads)
print('tails =', tails)
print('ratio =', ratio)


heads = 500
tails = 500
ratio = 0.5


In [3]:
rdd= sc.parallelize(range(n))

In [4]:
rdd2 = rdd.map(lambda x: random.random())

In [5]:
import numpy as np
np.array(rdd2.collect())

array([5.88613435e-01, 4.70583534e-01, 9.40173231e-01, 6.06538206e-01,
       5.82739900e-01, 3.10012393e-01, 5.73868817e-01, 6.89968386e-01,
       8.05385543e-01, 8.31527851e-01, 3.73643538e-01, 6.83200513e-01,
       9.87681135e-01, 5.97460448e-01, 2.06279091e-01, 1.16375958e-01,
       6.22979381e-01, 3.88023201e-01, 4.06208955e-01, 4.02780758e-01,
       1.26752278e-01, 4.71198052e-01, 1.38770302e-01, 9.55091253e-01,
       1.46051682e-01, 3.66150949e-01, 6.30171247e-01, 3.38405921e-01,
       7.36963580e-01, 6.49149873e-01, 6.22002556e-01, 6.83923157e-01,
       5.88409289e-01, 3.41972056e-01, 1.73497383e-01, 2.16038453e-01,
       9.26765960e-01, 6.04240841e-01, 1.58101826e-01, 6.70591572e-01,
       6.25861799e-01, 6.76359928e-01, 4.68743415e-01, 9.07214895e-01,
       7.03949369e-02, 5.67871888e-01, 1.51646617e-01, 9.89273783e-01,
       4.70955478e-01, 8.33082300e-01, 6.06468677e-01, 2.73051158e-01,
       8.41822357e-01, 2.91742545e-01, 5.17441674e-01, 2.61859311e-01,
      

In [6]:
len(rdd2.collect())

1000

In [7]:
rdd3=rdd2.filter(lambda r: r <= 0.5) 

In [8]:
rdd3.count() #action : collect, count, mean #tranformation: 

511

### Notes

- `sc.parallelize` creates an RDD.
- `map` and `filter` are *transformations*.
  - They create new RDDs from existing RDDs.
- `count` is an *action* and brings the data from the RDDs back to the driver.

## Spark Terminology

Term | Meaning
--- | ---
RDD | *Resilient Distributed Dataset* or a distributed sequence of records
Spark Job | Sequence of transformations on data with a final action
Transformation | Spark operation that produces an RDD
Action | Spark operation that produces a local object
Spark Application | Sequence of Spark jobs and other code

- A Spark job pushes the data to the cluster, all computation happens on the *executors*, then the result is sent back to the driver.

### Pop Quiz

<details>
<summary>
In this Spark job, what is the transformation and what is the action?
<br>
`sc.parallelize(xrange(10)).filter(lambda x: x % 2 == 0).collect()`
</summary>
1. `filter` is the transformation.
<br>
2. `collect` is the action.
</details>

## Lambda vs. Functions

- Instead of `lambda` you can pass in fully defined functions into `map`, `filter`, and other RDD transformations.
- Use `lambda` for short functions.
- Use `def` for more substantial functions.

## Finding Primes

Q: Find all the primes less than 100.

- Define function to determine if a number is prime.

In [9]:
def is_prime(number):
    factor_min = 2
    factor_max = int(number ** 0.5) + 1
    for factor in range(factor_min, factor_max):
        if number % factor == 0:
            return False
    return True

- Use this to filter out non-primes.

In [10]:
numbers = range(2, 100)

primes = (sc.parallelize(numbers)
    .filter(is_prime)
    .collect())

print(primes)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]


In [11]:
numbers=range(2,10)

In [12]:
rdd=sc.parallelize(numbers)

In [13]:
rdd2=rdd.filter(is_prime)

In [14]:
rdd3=rdd2.filter(lambda x: x>5)

In [15]:
rdd3.collect()

[7]

### Pop Quiz

<img src='assets/spark_execution.png'>

<details>
<summary>Q: Where does `is_prime` execute?</summary>
A: On the executors.
</details>

<details>
<summary>Q: Where does is the RDD object collected?</summary>
A: On the driver.
</details>

### Transformations and Actions

- Common RDD Constructors

Expression | Meaning
--- | ---
`sc.parallelize(list)` | Create RDD of elements of list
`sc.textFile(path)` | Create RDD of lines from file

- Common Transformations

Expression | Meaning
--- | ---
`filter(lambda x: x % 2 == 0)` | Discard non-even elements
`map(lambda x: x * 2)` | Multiply each RDD element by `2`
`map(lambda x: x.split())` | Split each string into words
`flatMap(lambda x: x.split())` | Split each string into words and flatten sequence
`sample(withReplacement = True, 0.25)` | Create sample of 25% of elements with replacement
`union(rdd)` | Append `rdd` to existing RDD
`distinct()` | Remove duplicates in RDD
`sortBy(lambda x: x, ascending = False)` | Sort elements in descending order

- Common Actions

Expression | Meaning
--- | ---
`collect()` | Convert RDD to in-memory list 
`take(3)` | First 3 elements of RDD 
`top(3)` | Top 3 elements of RDD
`takeSample(withReplacement = True, 3)` | Create sample of 3 elements with replacement
`sum()` | Find element sum (assumes numeric elements)
`mean()` | Find element mean (assumes numeric elements)
`stdev()` | Find element deviation (assumes numeric elements)

### Pop Quiz

Q: What will this output?

In [16]:
sc.parallelize([1, 3, 2, 2, 1, 4, 5]).distinct().collect()

[4, 1, 5, 2, 3]

Q: What will this output?

In [17]:
sc.parallelize(range(2, 10)).sortBy(lambda x: x, ascending=False).collect()

[9, 8, 7, 6, 5, 4, 3, 2]

Q: What will this output?

In [18]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line

Writing input.txt


In [19]:
sc.textFile('input.txt').map(lambda x: x.split()).count()

4

Q: What about this?

In [20]:
sc.textFile('input.txt').flatMap(lambda x: x.split()).count()

11

## Section 3.2 

## Map vs. FlatMap

Here's the difference between `map` and `flatMap`:

- Map:

In [21]:
sc.textFile('input.txt') \
    .map(lambda x: x.split()) \
    .collect()

[['hello', 'world'],
 ['another', 'line'],
 ['yet', 'another', 'line'],
 ['yet', 'another', 'another', 'line']]

In [22]:
rdd= sc.textFile('input.txt')

In [23]:
rdd2=rdd.map(lambda x: x.split())

In [24]:
rdd2.collect()

[['hello', 'world'],
 ['another', 'line'],
 ['yet', 'another', 'line'],
 ['yet', 'another', 'another', 'line']]

- FlatMap:

In [25]:
sc.textFile('input.txt') \
    .flatMap(lambda x: x.split()) \
    .collect()

['hello',
 'world',
 'another',
 'line',
 'yet',
 'another',
 'line',
 'yet',
 'another',
 'another',
 'line']

## PairRDD

At this point we know how to aggregate values across an RDD. If we have an RDD containing sales transactions we can find the total revenue across all transactions.

Q: Using the following sales data find the total revenue across all transactions.

In [26]:
%%writefile sales.txt
#ID    Date           Store   State  Product    Amount
101    11/13/2014     100     WA     331        300.00
104    11/18/2014     700     OR     329        450.00
102    11/15/2014     203     CA     321        200.00
106    11/19/2014     202     CA     331        330.00
103    11/17/2014     101     WA     373        750.00
105    11/19/2014     202     CA     321        200.00

Writing sales.txt


In [27]:
sc.textFile('sales.txt').top(2)

['106    11/19/2014     202     CA     331        330.00',
 '105    11/19/2014     202     CA     321        200.00']

- Read the file.

In [28]:
sc.textFile('sales.txt')\
    .take(2)

['#ID    Date           Store   State  Product    Amount',
 '101    11/13/2014     100     WA     331        300.00']

- Split the lines.

In [29]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .top(2)

[['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['105', '11/19/2014', '202', 'CA', '321', '200.00']]

- Remove `#`.

In [30]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: x[0].startswith('#'))\
    .take(3)

[['#ID', 'Date', 'Store', 'State', 'Product', 'Amount']]

- Try again.

In [31]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .take(3)

[['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00']]

- Pick last field.

In [32]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: x[-1])\
    .take(3)

['300.00', '450.00', '200.00']

In [36]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: x[-1])\
    .collect()

['300.00', '450.00', '200.00', '330.00', '750.00', '200.00']

- Convert to float and then sum.

In [37]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: float(x[-1]))\
    .sum()

2230.0

## ReduceByKey

Q: Calculate revenue per state?

- Instead of creating a sequence of revenue numbers we can create tuples of states and revenues.

In [38]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .collect()

[('WA', 300.0),
 ('OR', 450.0),
 ('CA', 200.0),
 ('CA', 330.0),
 ('WA', 750.0),
 ('CA', 200.0)]

- Now use `reduceByKey` to add them up.

In [39]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
    .collect()

[('CA', 730.0), ('WA', 1050.0), ('OR', 450.0)]

Q: Find the state with the highest total revenue.

- You can either use the action `top` or the transformation `sortBy`.

In [42]:
# Sort the states by Amount in descending order
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
    .sortBy(lambda state_amount: state_amount[1], ascending=False)\
    .collect()

[('WA', 1050.0), ('CA', 730.0), ('OR', 450.0)]

In [40]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
    .sortBy(lambda state_amount: state_amount[1])\
    .map(lambda x: x[1])\
    .sum()

2230.0

### Pop Quiz

<details>
<summary>Q: What does `reduceByKey` do?</summary>
1. It is like a reducer.
<br>
2. If the RDD is made up of key-value pairs, it combines the values across all tuples with the same key by using the function we pass to it.
<br>
3. It only works on RDDs made up of key-value pairs or 2-tuples.
</details>

## Notes

- `reduceByKey` only works on RDDs made up of 2-tuples.
- `reduceByKey` works as both a reducer and a combiner.
- It requires that the operation is associative.

## Word Count

Q: Implement word count in Spark.

- Create some input.

In [43]:
%%writefile input.txt
hello world
another line
yet another line
yet another another line

Overwriting input.txt


- Count the words.

In [44]:
sc.textFile('input.txt')\
    .flatMap(lambda line: line.split())\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda count1, count2: count1 + count2)\
    .collect()

[('world', 1), ('line', 3), ('yet', 2), ('hello', 1), ('another', 4)]

In [46]:
rdd=sc.textFile('input.txt')\
    .flatMap(lambda line: line.split())

In [47]:
rdd2= rdd.map(lambda word: (word, 1))

In [48]:
rdd2.collect()

[('hello', 1),
 ('world', 1),
 ('another', 1),
 ('line', 1),
 ('yet', 1),
 ('another', 1),
 ('line', 1),
 ('yet', 1),
 ('another', 1),
 ('another', 1),
 ('line', 1)]

In [49]:
rdd2.reduceByKey(lambda a, b:a+b).collect()

[('world', 1), ('line', 3), ('yet', 2), ('hello', 1), ('another', 4)]

## Making List Indexing Readable

- While this code looks reasonable, the list indexes are cryptic and hard to read.

In [66]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
    .sortBy(lambda state_amount: state_amount[1], ascending = False) \
    .collect()

[(u'WA', 1050.0), (u'CA', 730.0), (u'OR', 450.0)]

- We can make this more readable using Python's argument unpacking feature.

## Argument Unpacking

Q: Which version of `getCity` is more readable and why?

- Consider this code.

In [67]:
client = ('Dmitri', 'Smith', 'SF')

def getCity1(client):
    return client[2]

def getCity2((first, last, city)):
    return city

print 'getCity1(client) =', getCity1(client)
print 'getCity1(client) =', getCity2(client)

getCity1(client) = SF
getCity1(client) = SF


- What is the difference between `getCity1` and `getCity2`?
- Which is more readable?
- What is the essence of argument unpacking?

### Pop Quiz

<details>
<summary>Q: Can argument unpacking work for deeper nested structures?</summary>
A: Yes. It can work for arbitrarily nested tuples and lists.
</details>

<details>
<summary>Q: How would you write `getCity` given
<br>
`client = ('Dmitri','Smith', ('123 Eddy','SF','CA'))`
</summary>
`def getCity((first, last, (street, city, state))): return city`
</details>



- Let's test this out.

In [68]:
client = ('Dmitri', 'Smith',('123 Eddy', 'SF', 'CA'))

def getCity((first, last, (street, city, state))):
    return city

getCity(client)

'SF'

- Whenever you find yourself indexing into a tuple consider usingargument unpacking to make it more readable.
- Here is what `getCity` looks like without tuple indexing.

In [69]:
def badGetCity(client):
    return client[2][1]

getCity(client)

'SF'

## Argument Unpacking In Spark

Q: Rewrite the last Spark job using argument unpacking.

- Here is the original version of the code:

In [70]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .reduceByKey(lambda amount1,amount2: amount1+amount2)\
    .sortBy(lambda state_amount: state_amount[1],ascending=False) \
    .collect()

[(u'WA', 1050.0), (u'CA', 730.0), (u'OR', 450.0)]

- Here is the code with argument unpacking:

In [71]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda (id, date, store, state, product, amount): (state, float(amount)))\
    .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
    .sortBy(lambda (state, amount): amount, ascending = False) \
    .collect()

[(u'WA', 1050.0), (u'CA', 730.0), (u'OR', 450.0)]

- In this case because we have a long list or tuple argument unpacking is a judgement call.

## GroupByKey

`reduceByKey` lets us aggregate values using sum, max, min, and other associative operations. But what about non-associative operations like average? How can we calculate them?

There are several ways to do this.
- The first approach is to change the RDD tuples so that the operation becomes associative. 
  - Instead of `(state, amount)` use `(state, (amount, count))`.
- The second approach is to use `groupByKey`, which is like `reduceByKey` except it gathers together all the values in an iterator. 
  - The iterator can then be reduced in a `map` step immediately after the `groupByKey`.

Q: Calculate the average sales per state.

- Approach 1: Restructure the tuples:

In [72]:
sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], (float(x[-1]), 1)))\
    .reduceByKey(lambda (amount1, count1), (amount2, count2):\
        (amount1 + amount2, count1 + count2))\
    .map(lambda (state, (amount, count)): (state, amount / count))\
    .collect()

[(u'CA', 243.33333333333334), (u'WA', 525.0), (u'OR', 450.0)]

- Note the argument unpacking we are doing in `reduceByKey` to name the elements of the tuples.

- Approach 2: Use `groupByKey`:

In [73]:
def mean(iterator):
    total = 0.0; count = 0
    for x in iterator:
        total += x; count += 1
    return total / count

sc.textFile('sales.txt')\
    .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
    .groupByKey()\
    .map(lambda (state, iterator): (state, mean(iterator)))\
    .collect()

[(u'CA', 243.33333333333334), (u'WA', 525.0), (u'OR', 450.0)]

- Note that we are using unpacking again.

### Pop Quiz

<details>
<summary>Q: What would be the disadvantage of not using unpacking?</summary>
1. We will need to drill down into the elements.
<br>
2. The code will be harder to read.
</details>

<details>
<summary>Q: What are the pros and cons of `reduceByKey` vs. `groupByKey`?</summary>
1. `groupByKey` stores the values for particular key as an iterable.
<br>
2. This will take up space in memory or on disk.
<br>
3. `reduceByKey` therefore is more scalable.
<br>
4. However, `groupByKey` does not require associative reducer operation.
<br>
5. For this reason `groupByKey` can be easier to program with.
</details>

## Joins

Q: Given a table of employees and locations find the cities that the employees live in.

- The easiest way to do this is with a `join`.

In [74]:
# Employees: emp_id, loc_id, name
employee_data = [
    (101, 14, 'Alice'),
    (102, 15, 'Bob'),
    (103, 14, 'Chad'),
    (104, 15, 'Jen'),
    (105, 13, 'Dee') ]

# Locations: loc_id, location
location_data = [
    (14, 'SF'),
    (15, 'Seattle'),
    (16, 'Portland')]

employees = sc.parallelize(employee_data)
locations = sc.parallelize(location_data)

# Re-key employee records with loc_id
employees2 = employees.map(lambda (emp_id, loc_id, name): (loc_id, name))

# Now join.
employees2.leftOuterJoin(locations).collect()

[(13, ('Dee', None)),
 (14, ('Alice', 'SF')),
 (14, ('Chad', 'SF')),
 (15, ('Bob', 'Seattle')),
 (15, ('Jen', 'Seattle'))]

## Pop Quiz

<details>
<summary>Q: How can we keep employees that don't have a valid location ID in the final result?</summary>
1. Use `leftOuterJoin` to keep employees without location IDs.
<br>
2. Use `rightOuterJoin` to keep locations without employees. 
<br>
3. Use `fullOuterJoin` to keep both.
<br>
</details>

## RDD Statistics

Q: How would you calculate the mean, variance, and standard deviation of a sample produced by Python's `random` function?

- Create an RDD and apply the statistical actions to it.

In [75]:
n = 1000

list = [random.random() for _ in xrange(n)]

rdd = sc.parallelize(list)

print 'mean     =', rdd.mean()
print 'variance =', rdd.variance()
print 'stdev    =', rdd.stdev()

mean     = 0.506333143283
variance = 0.0813565893047
stdev    = 0.285230765004


## Pop Quiz

<details>
<summary>Q: What requirement does an RDD have to satisfy before you can apply these statistical actions to it? 
</summary>
The RDD must consist of numeric elements.
</details>

<details>
<summary>Q: What is the advantage of using Spark vs Numpy to calculate mean or standard deviation?</summary>
The calculation is distributed across different machines and will be more scalable.
</details>

## RDD Laziness

- Q: What is this Spark job doing?

In [76]:
n = 10000000

%time sc.parallelize(xrange(n)).map(lambda x: x + 1).count()

CPU times: user 12.7 ms, sys: 5.38 ms, total: 18.1 ms
Wall time: 1.26 s


10000000

- Q: How is the following job different from the previous one? How
  long do you expect it to take?

In [77]:
%time sc.parallelize(xrange(n)).map(lambda x: x + 1)

CPU times: user 2.52 ms, sys: 1.84 ms, total: 4.36 ms
Wall time: 7.46 ms


PythonRDD[210] at RDD at PythonRDD.scala:48

### Pop Quiz

<details>
<summary>Q: Why did the second job complete so much faster?</summary>
1. Because Spark is lazy. 
<br>
2. Transformations produce new RDDs and do no operations on the data.
<br>
3. Nothing happens until an action is applied to an RDD.
<br>
4. An RDD is the *recipe* for a transformation, rather than the *result* of the transformation.
</details>

<details>
<summary>Q: What is the benefit of keeping the recipe instead of the result of the action?</summary>
1. It saves memory.
<br>
2. It produces *resilience*. 
<br>
3. If an RDD loses data on a machine, it always knows how to recompute it.
</details>

## Writing Data

Besides reading data Spark and also write data out to a file system.

Q: Calculate the squares of integers from 1 to 10 and write them out to `squares.txt`.

- Make sure `squares.txt` does not exist.

In [78]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi

- Create the RDD and then save it to `squares.txt`.

In [79]:
rdd1 = sc.parallelize(xrange(10))
rdd2 = rdd1.map(lambda x: x ** 2)
rdd2.saveAsTextFile('squares.txt')

- Now look at the output.

In [80]:
!cat squares.txt

cat: squares.txt: Is a directory


- Looks like the output is a directory.

In [81]:
!ls -l squares.txt

total 32
-rw-r--r--  1 koyuki.nakamori  staff  0 Jun  8 22:13 _SUCCESS
-rw-r--r--  1 koyuki.nakamori  staff  4 Jun  8 22:13 part-00000
-rw-r--r--  1 koyuki.nakamori  staff  7 Jun  8 22:13 part-00001
-rw-r--r--  1 koyuki.nakamori  staff  6 Jun  8 22:13 part-00002
-rw-r--r--  1 koyuki.nakamori  staff  9 Jun  8 22:13 part-00003


- Lets take a look at the files.

In [82]:
!for i in squares.txt/part-*; do echo $i; cat $i; done

squares.txt/part-00000
0
1
squares.txt/part-00001
4
9
16
squares.txt/part-00002
25
36
squares.txt/part-00003
49
64
81


### Pop Quiz

<details>
<summary>Q: What's going on? Why are there four files (excluding `_SUCCESS`) in the output directory?</summary>
1. There were four threads that were processing the RDD.
<br>
2. The RDD was split up in four partitions (default with local mode: number of cores on the local machine).
<br>
3. Each partition was processed in a different task.
</details>

## Partitions

Q: Can we control the number of partitions/tasks that Spark uses for processing data? Solve the same problem as above but this time with 5 tasks.

- Make sure `squares.txt` does not exist.

In [83]:
!if [ -e squares.txt ] ; then rm -rf squares.txt ; fi

- Create the RDD and then save it to `squares.txt`.

In [84]:
partitions = 5
rdd1 = sc.parallelize(xrange(10), partitions)
rdd2 = rdd1.map(lambda x: x ** 2)
rdd2.saveAsTextFile('squares.txt')

- Now look at the output.

In [85]:
!ls -l squares.txt

!for i in squares.txt/part-*; do echo $i; cat $i; done

total 40
-rw-r--r--  1 koyuki.nakamori  staff  0 Jun  8 22:13 _SUCCESS
-rw-r--r--  1 koyuki.nakamori  staff  4 Jun  8 22:13 part-00000
-rw-r--r--  1 koyuki.nakamori  staff  4 Jun  8 22:13 part-00001
-rw-r--r--  1 koyuki.nakamori  staff  6 Jun  8 22:13 part-00002
-rw-r--r--  1 koyuki.nakamori  staff  6 Jun  8 22:13 part-00003
-rw-r--r--  1 koyuki.nakamori  staff  6 Jun  8 22:13 part-00004
squares.txt/part-00000
0
1
squares.txt/part-00001
4
9
squares.txt/part-00002
16
25
squares.txt/part-00003
36
49
squares.txt/part-00004
64
81


### Pop Quiz

<details>
<summary>Q: How many partitions does Spark use by default?</summary>
1. For operations like parallelize, it depends on the cluster manager:
<br>
  - Local mode: number of cores on the local machine
<br>
  - Others: total number of cores on all executor nodes or 2, whichever is larger
<br>
2. If you read an HDFS file into an RDD, Spark uses one partition per block.
<br>
3. If you read a file into an RDD from S3 or some other source, Spark uses 1 partition per 32 MB of data.
</details>

<details>
<summary>Q: If I read a file that is 200 MB into an RDD, how many partitions will that have?</summary>
1. If the file is on HDFS that will produce 2 partitions (each is 128 MB).
<br>
2. If the file is on S3 or some other file system it will produce 7 partitions.
<br>
3. You can also control the number of partitions by passing in an additional argument into `textFile`.
</details>

## Spark Terminology

<img src='assets/spark_execution.png'>

Term | Meaning
--- |---
Task | Single thread in an executor
Partition | Data processed by a single task
Record | Records make up a partition that is processed by a single task

### Notes

- Every Spark application gets executors when you create a new `SparkContext`.
- You can specify how many cores to assign to each executor.
- A core is equivalent to a thread.
- The number of cores determine how many tasks can run concurrently on an executor.
- Each task corresponds to one partition.

## Pop Quiz

<details>
<summary>
Q: Suppose you have 2 executors, each with 2 cores--so a total of 4
cores. And you start a Spark job with 8 partitions. How many tasks
will run concurrently?
</summary>
4 tasks will execute concurrently.
</details>

<details>
<summary>Q: What happens to the other partitions?</summary>
1. The other partitions wait in queue until a task thread becomes available.
<br>
2. Think of cores as turnstile gates at a train station, and partitions as people.
<br>
3. The number of turnstiles determine how many people can get through at once.
</details>

<details>
<summary>Q: How many Spark jobs can you have in a Spark application?</summary>
As many as you want.
</details>

<details>
<summary>Q: How many Spark applications and Spark jobs are in this IPython Notebook?</summary>
1. There is one Spark application because there is one `SparkContext`.
<br>
2. There are as many Spark jobs as we have invoked actions on RDDs.
</details>

## Stock Quotes

Q: Find the date on which AAPL's stock price was the highest.

Suppose you have stock market data from Yahoo! for AAPL from <http://finance.yahoo.com/q/hp?s=AAPL+Historical+Prices>. The data is in CSV format and has these values.

Date | Open | High | Low | Close | Volume | Adj Close
---- | --- | --- | --- | -----   |------ | ---
11-18-2014 | 113.94 | 115.69 | 113.89 | 115.47 | 44,200,30 | 115.47
11-17-2014 | 114.27 | 117.28 | 113.30 | 113.99 | 46,746,700 | 113.99

Here is what the CSV looks like:
    
    csv = [
      "#Date,Open,High,Low,Close,Volume,Adj Close\n",
      "2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
      "2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
    ]

Lets find the date on which the price was the highest. 

<details>
<summary>Q: What two fields do we need to extract?</summary>
1. *Date* and *Adj Close*.
<br>
2. We want to use *Adj Close* instead of *High* so our calculation is not affected by stock splits.
</details>

<details><summary>Q: What field should we sort on?</summary>
*Adj Close*
</details>

<details>
<summary>Q: What sequence of operations would we need to perform?</summary>
1. Use `filter` to remove the header line.
<br>
2. Use `map` to split each row into fields.
<br>
3. Use `map` to extract *Adj Close* and *Date*.
<br>
4. Use `sortBy` to sort descending on *Adj Close*.
<br>
5. Use `take(1)` to get the highest value.
</details>

- Here is full source.

In [86]:
csv = [
  "#Date,Open,High,Low,Close,Volume,Adj Close\n",
  "2014-11-18,113.94,115.69,113.89,115.47,44200300,115.47\n",
  "2014-11-17,114.27,117.28,113.30,113.99,46746700,113.99\n",
]

sc.parallelize(csv) \
  .filter(lambda line: not line.startswith("#")) \
  .map(lambda line: line.split(",")) \
  .map(lambda fields: (float(fields[-1]), fields[0])) \
  .sortBy(lambda (close, date): close, ascending = False) \
  .take(1)

[(115.47, '2014-11-18')]

- Here is the program for finding the high of any stock that stores
  the data in memory.

In [89]:
# import urllib2
# import re

# def get_stock_high(symbol):
    
#     url = 'http://real-chart.finance.yahoo.com/table.csv?s=' + symbol + '&g=d&ignore=.csv'
#     csv = urllib2.urlopen(url).read()
#     csv_lines = csv.split('\n')

#     #print csv_lines
#     stock_rdd = sc.parallelize(csv_lines)\
#         .filter(lambda line: re.match(r'\d', line))\
#         .map(lambda line: line.split(","))\
#         .map(lambda fields: (float(fields[-1]), fields[0]))\
#         .sortBy(lambda (close, date): close, ascending = False)

#     return stock_rdd.take(1)

# get_stock_high('AAPL')

### Notes

- Spark is high-level like Hive and Pig.
- At the same time it does not invent a new language.
- This allows it to leverage the ecosystem of tools that Python, Scala, and Java provide.

## RDD Caching

- Consider this Spark job.

In [None]:
n = 500000
numbers = [random.random() for _ in xrange(n)]
rdd1 = sc.parallelize(numbers)
rdd2 = rdd1.sortBy(lambda number: number)

- Lets time running `count()` on `rdd2`.

In [None]:
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

- The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throws away all the data.
- If you have an RDD that you are going to reuse in your computation you can use `cache()` to make Spark cache the RDD.

- Let's cache it and try again.

In [88]:
rdd2.cache()

%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 6.19 ms, sys: 2.38 ms, total: 8.58 ms
Wall time: 147 ms
CPU times: user 4.99 ms, sys: 1.8 ms, total: 6.79 ms
Wall time: 45.5 ms
CPU times: user 6.23 ms, sys: 1.96 ms, total: 8.18 ms
Wall time: 53.8 ms


10

- Caching the RDD speeds up the job because the RDD does not have to be computed from scratch again.

### Notes

- Calling `cache()` flips a flag on the RDD. 
- The data is not cached until an action is called.
- You can uncache an RDD using `unpersist()`.

### Pop Quiz

<details>
<summary>Q: Will `unpersist` uncache the RDD immediately or does it wait for an action?</summary>
A: It unpersists immediately.
</details>

## Caching and Persistence

Q: Persist RDD to disk instead of caching it in memory.
- You can cache RDDs at different levels.
- Here is an example.

In [49]:
rdd = sc.parallelize(xrange(100))
rdd.persist(pyspark.StorageLevel.DISK_ONLY)

PythonRDD[190] at RDD at PythonRDD.scala:48

### Pop Quiz

<details>
<summary>Q: Will the RDD be stored on disk at this point?</summary>
A: No. It will get stored after we call an action.
</details>

## Persistence Levels

Level | Meaning
--- | ---
`MEMORY_ONLY` | Same as `cache()`
`MEMORY_AND_DISK` | Cache in memory then overflow to disk
`MEMORY_AND_DISK_SER` | Like above; in cache keep objects serialized instead of live 
`DISK_ONLY` | Cache to disk not to memory

### Notes

- `MEMORY_AND_DISK_SER` is a good compromise between the levels. 
- Fast, but not too expensive.
- Make sure you unpersist when you don't need the RDD any more.

## Spark MLlib

- MLlib is Spark’s machine learning (ML) library.
- Its goal is to make practical machine learning scalable and easy.
- It consists of common learning algorithms, including:
  - Classification/Regression
    - Logistic Regression, Support vector machine (SVM), Naive Bayes, Gradient Boosted Trees, Random Forests, Multilayer Perceptron (e.g., a neural network), Generalized linear regression (GLM)
  - Recommenders/Collaborative Filtering
    - Non-negative matrix factorization (NMF)
  - Decomposition
    - Singular value decomposition (SVD), (Principal component analysis)
  - Clustering
    - K-Means, Latent Dirichlet allocation (LDA)

### Misc

- *"s3:" URLs break when Secret Key contains a slash, even if encoded* <https://issues.apache.org/jira/browse/HADOOP-3733>