In [1]:
import pyspark as ps

# Spark
### or: the power of laziness

Brandon Martin-Anderson, March 2019, with great debt to Moses Marsh &c.

### Imperative programming: all rules and no mercy

How does normal computer programming work?

1. Think of something you want done.
2. Describe **precisely** how it is accomplished.
3. The computer executes **precisely** your commands, quickly and mercilessly.

This is called **imperative programming**, marked by a **tight coupling** of intention and implementation.

This has a number of advantages
* Even simple languages can execute any algorithm ("Turing complete")
* Very efficient implementations possible

The disadvantages, of course:
* Difficult to learn
* Debugging is time-consuming
* **Jumps first without looking**
 * executing a O(n!) traveling salesman algorithm
 * opening a 300-million-line CSV

### Declarative programming: don't make me think

On the other hand, a **declarative language** allows you to specify **what** you want done. The implementation is left to the machine.

A familiar example:

```SQL
SELECT name, balance 
FROM accounts 
ORDER BY balance DESC 
LIMIT 10
```

An SQL query allows a **loose coupling** of intention and implementation. How is a database implemented? Who knows! Who cares! SQL lets you use a **chain of command**. You say what you want, and the machine employee figures it out.

Advantages:

* Easier to learn
* Less debugging
* **May respond appropriately to scale**
 * Might refuse to overload memory
 * Might pursue its own indexing/caching strategy
 * Might distribute jobs to subcontractors
   * Potentially even **on different machines**
   
This strategy enables operations on **extremely large** datasets; potentially involving thousands of machines.
 
## General Purpose Declarative Programming

It turns out that in order to make use of a chain of command, you need to express your algorithm in alignment with two important principles: laziness, and immutability.

### Laziness

Any given operation (and all operations that depend on it) can be deferred to a later time. This allows the chain of command to schedule operations in whatever order it determines is most efficient.

### Immutability

All operations generate new data and do not modify their input. This enables automatic analysis of optimal order of operations without risking corrupting data or introducting bugs.

## DAG

Lazy operations on immutable data are joined into a **directed acyclic graph (DAG)** of operations. Tasks expressed in this form are the basis of a broad class of delarative programming environments broadly known as **MapReduce**. 

MapReduce-style systems like Hadoop, Spark, Amazon EMR etc are characterized by **high parallelism** and **portability**. If you go to the trouble of expressing your problem in terms of a lazy DAG, it's an easy additional step to run your analysis on a **thousand computers in an hour**, instead of **one computer for a thousand hours**.

Today we're going to learn specifically about Spark, but the principles extend to similar systems.

# Getting into it: Spark

The Spark execution environment is divided into three different programs: the driver, the master, and worker.

### Driver

AKA the "client"; typically this is a short computer programm written by the end user. It's similar in length and purpose to an SQL query; expressing the intent of the program. Today, our driver is **this** Jupyter notebook.

### Master

AKA the "scheduler", "cluster manager". This is a program that the Driver interacts with, usually through a specific protocol, like an SQL client.

### Workers

AKA "worker node". Performs the bulk of operations. Workers are usually mediated by the Master, which performs the distribution and collection of work on the Driver's behalf.


In [2]:
import pyspark as ps

Create a spark cluster with four workers, all running on the current machine.

In [3]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )

The "spark context" is our communication channel to the master.

In [4]:
sc = spark.sparkContext
sc

### Imperative operation: squares

Specify specifically how to perform operation in place:

In [5]:
nums = list(range(100))
for i in range(100):
    nums[i] = nums[i]**2
nums[0:10]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

### Spark operation: squares

```
--(immutable input)--->[lazy operation]
```

In [6]:
# immutable, distributable input, created by the master via `sc`
inp = sc.parallelize(range(1000000000000000000))
inp

PythonRDD[1] at RDD at PythonRDD.scala:53

In [7]:
# lazy operation - not yet executed
outp = inp.map( lambda x: x**2 )
outp

PythonRDD[2] at RDD at PythonRDD.scala:53

In [8]:
outp

PythonRDD[2] at RDD at PythonRDD.scala:53

Asking to see the output forces the lazy DAG to actually execute.

In [9]:
outp.take(10)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

### Operation chaining, `filter`

In [10]:
def my_square(x):
    return x**2

inp = sc.parallelize(range(100))

outp = (
inp.map(my_square)
.map(lambda x: -x)
.filter(lambda x: x<-10)
)

outp

PythonRDD[5] at RDD at PythonRDD.scala:53

In [11]:
outp

PythonRDD[5] at RDD at PythonRDD.scala:53

In [12]:
outp.take(10)

[-16, -25, -36, -49, -64, -81, -100, -121, -144, -169]

- `sc.parallelize` creates an "Resiliant Distributed Dataset" (RDD).
- `map` and `filter` are *transformations*.
  - They create new RDDs from existing RDDs.
- `count`, `take`, and `collect` are *actions* that bring the data from the RDDs back to the driver.

### Transformations and Actions

Won't spend long on this; RDD is mostly depricated as a way to interact with Spark.

- Common RDD Constructors

Expression | Meaning
--- | ---
`sc.parallelize(list)` | Create RDD of elements of list
`sc.textFile(path)` | Create RDD of lines from file

- Common Transformations

Expression | Meaning
--- | ---
`filter(lambda x: x % 2 == 0)` | Discard non-even elements
`map(lambda x: x * 2)` | Multiply each RDD element by `2`
`map(lambda x: x.split())` | Split each string into words
`flatMap(lambda x: x.split())` | Split each string into words and flatten sequence
`sample(withReplacement = True, 0.25)` | Create sample of 25% of elements with replacement
`union(rdd)` | Append `rdd` to existing RDD
`distinct()` | Remove duplicates in RDD
`sortBy(lambda x: x, ascending = False)` | Sort elements in descending order

- Common Actions

Expression | Meaning
--- | ---
`collect()` | Convert RDD to in-memory list 
`take(3)` | First 3 elements of RDD 
`takeSample(withReplacement = True, 3)` | Create sample of 3 elements with replacement
`sum()` | Find element sum (assumes numeric elements)
`mean()` | Find element mean (assumes numeric elements)
`stdev()` | Find element deviation (assumes numeric elements)

In [13]:
inp = sc.textFile( "data/sales.txt"  )

In [14]:
inp.collect()

['#ID    Date           Store   State  Product    Amount',
 '101    11/13/2014     100     WA     331        300.00',
 '104    11/18/2014     700     OR     329        450.00',
 '102    11/15/2014     203     CA     321        200.00',
 '106    11/19/2014     202     CA     331        330.00',
 '103    11/17/2014     101     WA     373        750.00',
 '105    11/19/2014     202     CA     321        200.00']

In [15]:
rdd2 = inp.map(lambda x: x.split())

In [16]:
def casting_function(row):
    (id,date,store,state,product,amount) = row
    return (int(id), date, int(store), state, int(product), float(amount))

rdd3 = rdd2.filter( lambda row: not row[0][0]=="#" )

In [17]:
rdd4 = rdd3.map( casting_function )

In [18]:
rdd4.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [19]:
rdd5 = rdd4.map( lambda x: x[5] )

In [20]:
rdd5.sum()

2230.0

In [21]:
inp.filter(lambda x:x[0]!="#").map(lambda x:x.split()).map(lambda x:float(x[-1])).sum()

2230.0

### More complicated operations

In [22]:
# reads a text file line by line
rdd1 = sc.textFile('data/sales.txt')
rdd1

data/sales.txt MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0

In [23]:
rdd2 = rdd1.sample(False, 0.5)
rdd3 = rdd2.map(lambda x : x.split())
rdd4 = rdd3.filter(lambda row: not row[0].startswith('#'))

def casting_function(row):
    id_, date, store, state, product, amount = row
    return (int(id_), date, int(store), state, int(product), float(amount))
rdd5 = rdd4.map(casting_function)

See? Lazy bones.

In [24]:
rdd5

PythonRDD[14] at RDD at PythonRDD.scala:53

In [25]:
rdd5.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0)]

In [26]:
rdd3.collect()

[['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['102', '11/15/2014', '203', 'CA', '321', '200.00'],
 ['106', '11/19/2014', '202', 'CA', '331', '330.00'],
 ['103', '11/17/2014', '101', 'WA', '373', '750.00']]

In [27]:
rdd1.collect()

['#ID    Date           Store   State  Product    Amount',
 '101    11/13/2014     100     WA     331        300.00',
 '104    11/18/2014     700     OR     329        450.00',
 '102    11/15/2014     203     CA     321        200.00',
 '106    11/19/2014     202     CA     331        330.00',
 '103    11/17/2014     101     WA     373        750.00',
 '105    11/19/2014     202     CA     321        200.00']

## Find the date on which AAPL's stock price was the highest

In [28]:
rdd_aapl_raw = sc.textFile('data/aapl.csv')

In [29]:
rdd_aapl_raw

data/aapl.csv MapPartitionsRDD[17] at textFile at NativeMethodAccessorImpl.java:0

In [30]:
rdd_aapl_raw.take(5)

['Date,Open,High,Low,Close,Volume,Adj Close',
 '2016-10-25,117.949997,118.360001,117.309998,118.25,39190300,118.25',
 '2016-10-24,117.099998,117.739998,117.00,117.650002,23538700,117.650002',
 '2016-10-21,116.809998,116.910004,116.279999,116.599998,23192700,116.599998',
 '2016-10-20,116.860001,117.379997,116.330002,117.059998,24125800,117.059998']

### Task

Now, design a pipeline that would :
1. filter out headers
2. split each line based on comma
3. keep only fields for Date (col 0) and Close (col 4)
4. order by Close in descending order

### Code

In [31]:
rddout = rdd_aapl_raw # apply transformation here...

rddout.take(5)

['Date,Open,High,Low,Close,Volume,Adj Close',
 '2016-10-25,117.949997,118.360001,117.309998,118.25,39190300,118.25',
 '2016-10-24,117.099998,117.739998,117.00,117.650002,23538700,117.650002',
 '2016-10-21,116.809998,116.910004,116.279999,116.599998,23192700,116.599998',
 '2016-10-20,116.860001,117.379997,116.330002,117.059998,24125800,117.059998']

## Caching / Persistency

- The RDD does no work until an action is called. And then when an action is called it figures out the answer and then throws away all the data.
- If you have an RDD that you are going to reuse in your computation you can use cache() to make Spark cache the RDD.
- This is especially useful if you have to run the same computation over and over again on one RDD: one use case ? oh I don't know maybe... **MACHINE LEARNING !!!**

In [32]:
import random
import math

num_count = 500*1000
num_list = [random.random() for i in range(num_count)]
rdd1 = sc.parallelize(num_list)
rdd2 = rdd1.sortBy(lambda num: num)

In [33]:
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 0 ns, sys: 10 ms, total: 10 ms
Wall time: 5.09 s
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 979 ms
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 1.49 s


500000

Lets cache it and try again.

In [34]:
rdd2.cache()
%time rdd2.count()
%time rdd2.count()
%time rdd2.count()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.5 s
CPU times: user 10 ms, sys: 10 ms, total: 20 ms
Wall time: 192 ms
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 272 ms


500000

- Caching the RDD speeds up the job because the RDD does not have to be computed from scratch again.
- Calling cache() flips a flag on the RDD.
- The data is not cached until an action is called.
- You can uncache an RDD using unpersist()

# DataFrames in Spark

# Overview

## RDDs versus DataFrames

What is a DataFrame?
- DataFrames are the primary abstraction in Spark SQL.
- Think of a DataFrames as RDDs with schema.

What is a schema?
- Schemas are metadata about your data.
- Schemas define table names, column names, and column types over your data.
- Schemas enable using SQL and DataFrame syntax to query your RDDs, instead of using column positions.

What is Spark SQL?
- Spark SQL takes basic RDDs and puts a schema on them.

What is a schema again?
- Schema = Table Names + Column Names + Column Types

What are the pros of schemas?
- Schemas enable using column names instead of column positions
- Schemas enable queries using SQL and DataFrame syntax
- Schemas make your data more structured.

# Operational DataFrames in Python

We'll proceed along the usual spark flow (see above).
1. create the environment to run Spark / Spark SQL from python
2. create DataFrames from RDDs or from files
3. run some transformations
4. execute actions to obtain values (local objects in python)

In [35]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

* In RDD land, `sc` is the connection to the cluster.
* In DF land, `spark` is the connection to the cluster.

```¯\_(ツ)_/¯```

In [36]:
spark

In [37]:
sc

## Creating a DataFrame manually

### From an RDD (specifying schema)

You can create a DataFrame from an existing RDD (whatever source you used to create this one), if you add a schema.

To build a schema, you will use existing data types provided in the [`pyspark.sql.types`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types) module. Here's a list of the most useful ones (subjective criteria).

| Types | Python-like type |
| - | - |
| StringType | string |
| IntegerType | int |
| FloatType | float |
| ArrayType\* | array or list |
| MapType | dict |

\* see later UDF functions on how to use that

In [38]:
!head data/sales.csv

#ID,Date,Store,State,Product,Amount
101,11/13/2014,100,WA,331,300.00
104,11/18/2014,700,OR,329,450.00
102,11/15/2014,203,CA,321,200.00
106,11/19/2014,202,CA,331,330.00
103,11/17/2014,101,WA,373,750.00
105,11/19/2014,202,CA,321,200.00


In [40]:
def casting_function(row):
    (id, date, store, state, product, amount) = row
    return (int(id), date, int(store), state, int(product), float(amount))

rdd_sales = (
    sc.textFile('data/sales.csv')
        .map(lambda rowstr : rowstr.split(","))
        .filter(lambda row: not row[0].startswith('#'))
        .map(casting_function)
            )

rdd_sales.collect()

[(101, '11/13/2014', 100, 'WA', 331, 300.0),
 (104, '11/18/2014', 700, 'OR', 329, 450.0),
 (102, '11/15/2014', 203, 'CA', 321, 200.0),
 (106, '11/19/2014', 202, 'CA', 331, 330.0),
 (103, '11/17/2014', 101, 'WA', 373, 750.0),
 (105, '11/19/2014', 202, 'CA', 321, 200.0)]

In [41]:
# import the many data types
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField

# create a schema of your own
schema = StructType( [
    StructField('id',IntegerType(),True),
    StructField('date',StringType(),True),
    StructField('store',IntegerType(),True),
    StructField('state',StringType(),True),
    StructField('product',IntegerType(),True),
    StructField('amount',FloatType(),True) ] )

# feed that into a DataFrame
df = spark.createDataFrame(rdd_sales,schema)

# show the result
df.show()

# print the schema
df.printSchema()

+---+----------+-----+-----+-------+------+
| id|      date|store|state|product|amount|
+---+----------+-----+-----+-------+------+
|101|11/13/2014|  100|   WA|    331| 300.0|
|104|11/18/2014|  700|   OR|    329| 450.0|
|102|11/15/2014|  203|   CA|    321| 200.0|
|106|11/19/2014|  202|   CA|    331| 330.0|
|103|11/17/2014|  101|   WA|    373| 750.0|
|105|11/19/2014|  202|   CA|    321| 200.0|
+---+----------+-----+-----+-------+------+

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- store: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- product: integer (nullable = true)
 |-- amount: float (nullable = true)



### Reading from files (inferring schema)

Use [`sqlContext.read.csv`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv) to load a CSV into a DataFrame. You can specify every useful parameter in there. It can infer the schema.

In [42]:
# read CSV
df = spark.read.csv('data/sales.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?



In [43]:
df

DataFrame[#ID: int, Date: string, Store: int, State: string, Product: int, Amount: double]

In [None]:
# prints the schema
df.printSchema()

# some functions are still valid
print("line count: {}".format(df.count()))

# show the table in a oh-so-nice format
df.show()

Use [`spark.read.json`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json) to load a JSON file into a DataFrame. You can specify every useful parameter in there. It can infer the schema.

In [32]:
# read JSON
df = spark.read.json('data/sales.json')

# prints the schema
df.printSchema()

# some functions are still valid
print("line count: {}".format(df.count()))

# show the table in a oh-so-nice format
df.show()

root
 |-- amount: double (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- product: long (nullable = true)
 |-- state: string (nullable = true)
 |-- store: long (nullable = true)

line count: 6
+------+----------+---+-------+-----+-----+
|amount|      date| id|product|state|store|
+------+----------+---+-------+-----+-----+
| 300.0|11/13/2014|101|    331|   WA|  100|
| 450.0|11/18/2014|104|    329|   OR|  700|
| 200.0|11/15/2014|102|    321|   CA|  203|
| 330.0|11/19/2014|106|    331|   CA|  202|
| 750.0|11/17/2014|103|    373|   WA|  101|
| 200.0|11/19/2014|105|    321|   CA|  202|
+------+----------+---+-------+-----+-----+



In [33]:
df.collect()

[Row(amount=300.0, date='11/13/2014', id=101, product=331, state='WA', store=100),
 Row(amount=450.0, date='11/18/2014', id=104, product=329, state='OR', store=700),
 Row(amount=200.0, date='11/15/2014', id=102, product=321, state='CA', store=203),
 Row(amount=330.0, date='11/19/2014', id=106, product=331, state='CA', store=202),
 Row(amount=750.0, date='11/17/2014', id=103, product=373, state='WA', store=101),
 Row(amount=200.0, date='11/19/2014', id=105, product=321, state='CA', store=202)]

## Actions : turning your DataFrame into a local object

Some actions just remain the same, you won't have to learn Spark all over again.

Some new actions give you the possibility to describe and show the content in a more fashionable manner.

When used/executed in IPython or in a notebook, they **launch the processing of the DAG**. This is where Spark stops being **lazy**. This is where your script will take time to execute.

| Method | DF vs RDD? | Description |
| - | - | - |
| [`.collect()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.collect) | identical | Return a list that contains all of the elements as Rows. |
| [`.count()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.count) | identical | Return the number of elements. |
| [`.take(n)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take) | identical | Take the first `n` elements. |
| [`.first()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first) | identical | Return the first element. |
| [`.show(n)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show) | <span style="color:green">new</span> | Show the DataFrame in table format (`n=20` by default) |
| [`.toPandas()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas) | <span style="color:green">new</span> | Convert the DF into a Pandas DF. |
| [`.printSchema(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema)\* | <span style="color:green">new</span> | Display the schema. This is not an action, it doesn't launch the DAG, but it fits better in this category. |
| [`.describe(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) | <span style="color:green">new</span> | Compute statistics for this column. |
| [`.sum(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.sum) | <span style="color:red">different</span> | Applies on GroupedData only (see transformations). |
| [`.mean(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.mean) | <span style="color:red">different</span> | Applies on GroupedData only (see transformations). |
| [`.min(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.min) | <span style="color:red">different</span> | Applies on GroupedData only (see transformations). |
| [`.max(*cols)`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.max) | <span style="color:red">different</span> | Applies on GroupedData only (see transformations). |


A really handy to go from DAG -> Python.

In [34]:
df.toPandas() #heck that's useful

Unnamed: 0,amount,date,id,product,state,store
0,300.0,11/13/2014,101,331,WA,100
1,450.0,11/18/2014,104,329,OR,700
2,200.0,11/15/2014,102,321,CA,203
3,330.0,11/19/2014,106,331,CA,202
4,750.0,11/17/2014,103,373,WA,101
5,200.0,11/19/2014,105,321,CA,202


This is how `.collect()` returns things...

In [35]:
df.collect()

[Row(amount=300.0, date='11/13/2014', id=101, product=331, state='WA', store=100),
 Row(amount=450.0, date='11/18/2014', id=104, product=329, state='OR', store=700),
 Row(amount=200.0, date='11/15/2014', id=102, product=321, state='CA', store=203),
 Row(amount=330.0, date='11/19/2014', id=106, product=331, state='CA', store=202),
 Row(amount=750.0, date='11/17/2014', id=103, product=373, state='WA', store=101),
 Row(amount=200.0, date='11/19/2014', id=105, product=321, state='CA', store=202)]

In [36]:
# prints the schema
print("--- printSchema()")
df.printSchema()

# prints the table itself
print("--- show()")
df.show()

# show the statistics of all numerical columns
print("--- describe()")
df.describe().show()

# show the statistics of one specific column
print("--- describe(amount)")
df.describe("amount").show()

--- printSchema()
root
 |-- amount: double (nullable = true)
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- product: long (nullable = true)
 |-- state: string (nullable = true)
 |-- store: long (nullable = true)

--- show()
+------+----------+---+-------+-----+-----+
|amount|      date| id|product|state|store|
+------+----------+---+-------+-----+-----+
| 300.0|11/13/2014|101|    331|   WA|  100|
| 450.0|11/18/2014|104|    329|   OR|  700|
| 200.0|11/15/2014|102|    321|   CA|  203|
| 330.0|11/19/2014|106|    331|   CA|  202|
| 750.0|11/17/2014|103|    373|   WA|  101|
| 200.0|11/19/2014|105|    321|   CA|  202|
+------+----------+---+-------+-----+-----+

--- describe()
+-------+------------------+----------+------------------+------------------+-----+------------------+
|summary|            amount|      date|                id|           product|state|             store|
+-------+------------------+----------+------------------+------------------+-----+-----

## Transformations on DataFrames

- They are still **lazy**: Spark doesn't apply the transformation right away, it just builds on the **DAG**
- They transform a DataFrame into another because DataFrames are also **immutable**.
- They can be **wide** or **narrow** (whether they shuffle partitions or not).

You got that... DataFrames are just RDDs with a schema.

### selecting and adding columns

In [37]:
# read CSV
df_aapl = spark.read.csv('data/aapl.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

df_aapl.show(5)

df_aapl.printSchema()

+-------------------+----------+----------+----------+----------+--------+----------+
|               Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+-------------------+----------+----------+----------+----------+--------+----------+
|2016-10-25 00:00:00|117.949997|118.360001|117.309998|    118.25|39190300|    118.25|
|2016-10-24 00:00:00|117.099998|117.739998|     117.0|117.650002|23538700|117.650002|
|2016-10-21 00:00:00|116.809998|116.910004|116.279999|116.599998|23192700|116.599998|
|2016-10-20 00:00:00|116.860001|117.379997|116.330002|117.059998|24125800|117.059998|
|2016-10-19 00:00:00|    117.25|117.760002|113.800003|117.120003|20034600|117.120003|
+-------------------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: 

In [38]:
df_out = df_aapl.select("Open", "Close")

df_out.show(5)

+----------+----------+
|      Open|     Close|
+----------+----------+
|117.949997|    118.25|
|117.099998|117.650002|
|116.809998|116.599998|
|116.860001|117.059998|
|    117.25|117.120003|
+----------+----------+
only showing top 5 rows



In [39]:
df_aapl[["Open","Close"]].show(5)

+----------+----------+
|      Open|     Close|
+----------+----------+
|117.949997|    118.25|
|117.099998|117.650002|
|116.809998|116.599998|
|116.860001|117.059998|
|    117.25|117.120003|
+----------+----------+
only showing top 5 rows



#### `.withColumn("label", func)` : constant value

In [40]:
from pyspark.sql.functions import lit

df_out = df_aapl.withColumn("blabla", lit(34))

df_out[['Open','High','blabla']].show(5)

+----------+----------+------+
|      Open|      High|blabla|
+----------+----------+------+
|117.949997|118.360001|    34|
|117.099998|117.739998|    34|
|116.809998|116.910004|    34|
|116.860001|117.379997|    34|
|    117.25|117.760002|    34|
+----------+----------+------+
only showing top 5 rows



#### `.withColumn("label", func)` : column operations

In [41]:
df_out = (df_aapl
            .withColumn("diff", 
                         df_aapl['High'] - df_aapl['Low'])
            .select('Date', 'High', 'Low', 'diff')
         )

df_out.show(5)

+-------------------+----------+----------+------------------+
|               Date|      High|       Low|              diff|
+-------------------+----------+----------+------------------+
|2016-10-25 00:00:00|118.360001|117.309998|1.0500030000000038|
|2016-10-24 00:00:00|117.739998|     117.0|0.7399979999999999|
|2016-10-21 00:00:00|116.910004|116.279999| 0.630004999999997|
|2016-10-20 00:00:00|117.379997|116.330002|1.0499950000000098|
|2016-10-19 00:00:00|117.760002|113.800003|3.9599989999999963|
+-------------------+----------+----------+------------------+
only showing top 5 rows



#### `.withColumn("label", func)` : user defined function
`udf()` turns a normal python function into something Spark can parallelized across its distributed data. `udf()` requires two arguments: a function, and the data type the function will return

In [42]:
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, FloatType

def my_specialfunc(h,l,o,c):
    return ((h-l)*(math.exp(o-c)))

my_specialfunc_udf = udf(my_specialfunc, FloatType())

df_out = df_aapl.withColumn("special", my_specialfunc_udf(df_aapl['High'], 
                                                          df_aapl['Low'], 
                                                          df_aapl['Open'], 
                                                          df_aapl['Close']))

df_out.select('High', 'Low', 'Open', 'Close', 'special').show()

+----------+----------+----------+----------+----------+
|      High|       Low|      Open|     Close|   special|
+----------+----------+----------+----------+----------+
|118.360001|117.309998|117.949997|    118.25|0.77785903|
|117.739998|     117.0|117.099998|117.650002|   0.42694|
|116.910004|116.279999|116.809998|116.599998|0.77722335|
|117.379997|116.330002|116.860001|117.059998|0.85966575|
|117.760002|113.800003|    117.25|117.120003| 4.5097456|
|118.209999|117.449997|    118.18|117.470001| 1.5458359|
|117.839996|116.779999|117.330002|117.550003|0.85066664|
|118.169998|117.129997|117.879997|117.629997| 1.3353877|
|117.440002|115.720001|116.790001|116.980003| 1.4223677|
|117.980003|    116.75|117.349998|117.339996| 1.2423673|
|118.690002|116.199997|117.699997|116.300003| 10.097407|
|    116.75|114.720001|115.019997|116.050003| 0.7247194|
|114.559998|113.510002|114.309998|114.059998| 1.3482215|
|114.339996|113.129997|113.699997|113.889999| 1.0006177|
|113.660004|112.690002|113.4000

### aggregating and sorting columns

In [43]:
df.show()

+------+----------+---+-------+-----+-----+
|amount|      date| id|product|state|store|
+------+----------+---+-------+-----+-----+
| 300.0|11/13/2014|101|    331|   WA|  100|
| 450.0|11/18/2014|104|    329|   OR|  700|
| 200.0|11/15/2014|102|    321|   CA|  203|
| 330.0|11/19/2014|106|    331|   CA|  202|
| 750.0|11/17/2014|103|    373|   WA|  101|
| 200.0|11/19/2014|105|    321|   CA|  202|
+------+----------+---+-------+-----+-----+



In [44]:
from pyspark.sql import functions as F
df_out = df.groupBy("State").agg(F.sum("Amount"), F.avg('Product'))

df_out.show()

+-----+-----------+-----------------+
|State|sum(Amount)|     avg(Product)|
+-----+-----------+-----------------+
|   OR|      450.0|            329.0|
|   CA|      730.0|324.3333333333333|
|   WA|     1050.0|            352.0|
+-----+-----------+-----------------+



#### `.orderBy()` : sorting by a column

In [45]:
df_out = (df.groupBy("State")
                  .agg(F.sum("Amount"))
                  .orderBy("sum(Amount)", ascending=False)
         )

df_out.show()

+-----+-----------+
|State|sum(Amount)|
+-----+-----------+
|   WA|     1050.0|
|   CA|      730.0|
|   OR|      450.0|
+-----+-----------+



## Find the date on which AAPL's stock price was the highest

### Input DataFrame

In [46]:
# read CSV
df_aapl = spark.read.csv('data/aapl.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

df_aapl.show(5)

+-------------------+----------+----------+----------+----------+--------+----------+
|               Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+-------------------+----------+----------+----------+----------+--------+----------+
|2016-10-25 00:00:00|117.949997|118.360001|117.309998|    118.25|39190300|    118.25|
|2016-10-24 00:00:00|117.099998|117.739998|     117.0|117.650002|23538700|117.650002|
|2016-10-21 00:00:00|116.809998|116.910004|116.279999|116.599998|23192700|116.599998|
|2016-10-20 00:00:00|116.860001|117.379997|116.330002|117.059998|24125800|117.059998|
|2016-10-19 00:00:00|    117.25|117.760002|113.800003|117.120003|20034600|117.120003|
+-------------------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows



### Task

Now, design a pipeline that would :

1. keep only fields for Date and Close 
4. order by Close in descending order

### Code

In [171]:
# do it live

### Mid-late lecture reminder

* Why are we doing this?
* When should we use this?
* When shuuld we **not** use this?

# The SQL Interface

I know you missed it. Let's run some SQL queries on these tables!

First we tell spark to create "SQL namespace" and assign a name to our dataframe:

In [172]:
# This creates a table **on the cluster**
df_aapl.createOrReplaceTempView('aapl')

Now we can write queries using `spark.sql`, and it will have access to any table we have registered like above. The output of the query is another spark dataframe.

In [173]:
df_sql = spark.sql("SELECT Open, Close, Close - Open as diff FROM aapl LIMIT 3")
df_sql.show()

+----------+----------+--------------------+
|      Open|     Close|                diff|
+----------+----------+--------------------+
|117.949997|    118.25|  0.3000030000000038|
|117.099998|117.650002|  0.5500040000000013|
|116.809998|116.599998|-0.20999999999999375|
+----------+----------+--------------------+



In [174]:
df_aapl.show()

+-------------------+----------+----------+----------+----------+--------+----------+
|               Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+-------------------+----------+----------+----------+----------+--------+----------+
|2016-10-25 00:00:00|117.949997|118.360001|117.309998|    118.25|39190300|    118.25|
|2016-10-24 00:00:00|117.099998|117.739998|     117.0|117.650002|23538700|117.650002|
|2016-10-21 00:00:00|116.809998|116.910004|116.279999|116.599998|23192700|116.599998|
|2016-10-20 00:00:00|116.860001|117.379997|116.330002|117.059998|24125800|117.059998|
|2016-10-19 00:00:00|    117.25|117.760002|113.800003|117.120003|20034600|117.120003|
|2016-10-18 00:00:00|    118.18|118.209999|117.449997|117.470001|24553500|117.470001|
|2016-10-17 00:00:00|117.330002|117.839996|116.779999|117.550003|23624900|117.550003|
|2016-10-14 00:00:00|117.879997|118.169998|117.129997|117.629997|35652200|117.629997|
|2016-10-13 00:00:00|116.790001|117.440002|115.720001|

In [176]:
df.createOrReplaceTempView('sales')

In [177]:
df.show()

+------+----------+---+-------+-----+-----+
|amount|      date| id|product|state|store|
+------+----------+---+-------+-----+-----+
| 300.0|11/13/2014|101|    331|   WA|  100|
| 450.0|11/18/2014|104|    329|   OR|  700|
| 200.0|11/15/2014|102|    321|   CA|  203|
| 330.0|11/19/2014|106|    331|   CA|  202|
| 750.0|11/17/2014|103|    373|   WA|  101|
| 200.0|11/19/2014|105|    321|   CA|  202|
+------+----------+---+-------+-----+-----+



In [178]:
query = '''SELECT state, SUM(Amount) as total 
            FROM sales 
            GROUP BY State 
            ORDER BY total DESC'''

spark.sql(query).show()

+-----+------+
|state| total|
+-----+------+
|   WA|1050.0|
|   CA| 730.0|
|   OR| 450.0|
+-----+------+

