# Learning Spark with Python

## Initializing Spark
In order to link with any Spark program, we need to import some Spark classes.

In [1]:
from pyspark import SparkContext, SparkConf

To communicate we Spark, we need to create a `SparkContext` object. The `SparkContext` object informs Spark on how to access a cluster.

In [2]:
conf = SparkConf().setAppName("Learning Spark")
sc = SparkContext(conf=conf)

## Working with RDDs

`RDDs`, or *Resilient Distributed Datasets*, are immutable (can't be modified), distributed collections of elements of your data. Prior to Spark 1.6, RDDs were the only API availble for interacting with your data in Spark. While more recent versions introduce the  `DataFrame` and `DataSet` APIs, there's no better place to begin learning Spark than with RDDs.

Let's begin our notebook by reading a ".csv" file into an RDD. The sample data contains sales information from an imaginary stationery company and has the following schema:

`|OrderDate|Region|Rep|Item|Units|Unit Cost|Total|`

*Sample data courtesy: http://www.contextures.com/xlSampleData01.html*

In [3]:
raw_rdd = sc.textFile("sample_data/stationery.csv")
print raw_rdd.take(3) # take(n) grabs the first n rows of an RDD and returns them in a list.

[u'OrderDate,Region,Rep,Item,Units,Unit Cost,Total', u'1/6/2016,East,Jones,Pencil,95,1.99,189.05', u'1/23/2016,Central,Kivell,Binder,50,19.99,999.50']


Looking at the above printed list, we can see that while we were able to read everything in, but the contents of each row are bunched together in long strings and not separated by their commas. As well, the documents header is being included in the first row of the RDD. This result is due to limitations with the `textFile()` method's functionality. Newer APIs (`DataFrames and DataSets`) have built in fileType resolutions. For now - how do we correct this?

## Transformations and Actions
RDDs offer two options for interacting with them: **transformations** and **actions**. **Transformations** construct new RDDs from previous ones and  **actions** compute a result from an RDD and either return it to the driver program or write it to a file.

*Check out Spark's Programming guide for a more complete list of available methods:*
http://spark.apache.org/docs/latest/programming-guide.html

### Transformations
Below, you will find descriptions and examples of the transformations we will use to properly format our data.  

* `map(func)`: Takes in a function and applies it to each element in an RDD.
```python
squared = rdd.map(lambda x: x * x)
```
* `filter(func)`: Takes in a condition and keeps elements in the RDD that meet it.
```python
filtered = rdd.filter(lambda x: x > 10)
```

Now that we've seen some example transformations, we can apply these to our RDD to properly format it.

In [4]:
list_rdd = raw_rdd.map(lambda line: [word.strip() for word in line.split(',')])
header = list_rdd.first()
rdd = list_rdd.filter(lambda line: line != header)
rdd.persist() # Caches the RDD in memory to avoid disk reads. Use when frequently reusing an RDD.
print rdd.take(3)

[[u'1/6/2016', u'East', u'Jones', u'Pencil', u'95', u'1.99', u'189.05'], [u'1/23/2016', u'Central', u'Kivell', u'Binder', u'50', u'19.99', u'999.50'], [u'2/9/2016', u'Central', u'Jardine', u'Pencil', u'36', u'4.99', u'179.64']]


We now have an RDD of the form `RDD[List[String]]` - something we can work with!

### Actions
Now, let's say that we want to find the total amount of revenue made by our stationery company(remember that the total column is the last one). First, let's cover the reduce action:
* `reduce(func)`: Takes in an aggregating function that operations on two elements and returns an element of the same type.
```python
rdd_sum = rdd.reduce(lambda a, b: a + b)
```
As well, note that `.first()` and `.take()` are both considered actions as they turn values to the driver programs.

Next, let's see how we can apply reduce to our RDD to get the total sales.

In [5]:
total_sales = (rdd.map(lambda line: float(line[-1]))
              .reduce(lambda a, b: a + b))
print total_sales

19627.88


### PairRDD Transformations
Often, we find ourselves working with data in a `(key, value)` format. Spark has built in operations for working with data like this. When an contains a Python tuple such as `(1, 2)`, these operations become available.
* `reduceByKey(func)`: When called on a dataset of `(key, value)` pairs it returns an RDD of format `(key, aggregated values)` where the values for each key are aggregated by `func`.  
```python
rgg_rbk = rdd.reduceByKey(lambda a, b: a + b)
```
Below, we'll use `reduceByKey()` count the number of each stationery type sold.

In [7]:
rep_units_rdd =  rdd.map(lambda line: (line[3], int(line[4])))
unit_sums_rdd = rep_units_rdd.reduceByKey(lambda a, b: a + b)
print unit_sums_rdd.collect()
sc.stop()

[(u'Binder', 722), (u'Pen Set', 395), (u'Pen', 278), (u'Pencil', 716), (u'Desk', 10)]


## Working with DataFrames
When Spark 1.6 came around, Spark introduced the `DataFrames` API. `DataFrames` are organized into named columns - similar to a relational database - and provide a more structured approach to working with distributed data. (`DataSets`, which combine the functionality of `RDDs` and `DataFrames`, are not available in Spark's Python API.)

`DataFrames` use  significantly less memory when caching and provide `SparkSQL` functionality. When working with `DataFrames`, a `SparkSession` is used to initalize the `SparkContext` and `SparkConf`. `SparkSessions` also allow for the use of `RDDs` - essentially, the Spark folks created them to allow for a singular entry point to a Spark program regardless of whether `RDDs` or `DataFrames` are being used.

In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Learning Spark Dataframes").getOrCreate()

Now, let's read in the `.csv` file, this time using the `DataFrame` API.

In [15]:
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('sample_data/stationery.csv')
df.printSchema()
df.show()

root
 |-- OrderDate: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Rep: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total: double (nullable = true)

+----------+-------+--------+-------+-----+---------+-------+
| OrderDate| Region|     Rep|   Item|Units|Unit Cost|  Total|
+----------+-------+--------+-------+-----+---------+-------+
|  1/6/2016|   East|   Jones| Pencil|   95|     1.99| 189.05|
| 1/23/2016|Central|  Kivell| Binder|   50|    19.99|  999.5|
|  2/9/2016|Central| Jardine| Pencil|   36|     4.99| 179.64|
| 2/26/2016|Central|    Gill|    Pen|   27|    19.99| 539.73|
| 3/15/2016|   West| Sorvino| Pencil|   56|     2.99| 167.44|
|  4/1/2016|   East|   Jones| Binder|   60|     4.99|  299.4|
| 4/18/2016|Central| Andrews| Pencil|   75|     1.99| 149.25|
|  5/5/2016|Central| Jardine| Pencil|   90|     4.99|  449.1|
| 5/22/2016|   West|Thompson| Pencil|   32|   

We can see that the header was properly mapped to the DataFrame columns and there was no need to split up any strings!
### Selecting, Filtering, and GroupBy
In Python it’s possible to access a `DataFrame’s` columns either by attribute (df.age) or by indexing (df['age']). While the former is convenient for interactive data exploration, users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.

Below are two examples of how to select and filter your data.

In [30]:
df.select(df["Item"], df["Units"]).show(5)
df.filter((df["Unit Cost"] > 5) & (df["Region"] == "East")).show(5)

+------+-----+
|  Item|Units|
+------+-----+
|Pencil|   95|
|Binder|   50|
|Pencil|   36|
|   Pen|   27|
|Pencil|   56|
+------+-----+
only showing top 5 rows

+----------+------+------+-------+-----+---------+-------+
| OrderDate|Region|   Rep|   Item|Units|Unit Cost|  Total|
+----------+------+------+-------+-----+---------+-------+
|  6/8/2016|  East| Jones| Binder|   60|     8.99|  539.4|
| 7/29/2016|  East|Parent| Binder|   81|    19.99|1619.19|
| 9/18/2016|  East| Jones|Pen Set|   16|    15.99| 255.84|
|10/22/2016|  East| Jones|    Pen|   64|     8.99| 575.36|
| 11/8/2016|  East|Parent|    Pen|   15|    19.99| 299.85|
+----------+------+------+-------+-----+---------+-------+
only showing top 5 rows



### SparkSQL
In order to leverage SparkSQL, we need to create an SQL Temporary View and then pass it an SQL query.

In [31]:
df.createOrReplaceTempView("stationery")
query = "SELECT Rep, SUM(Units) FROM stationery GROUP BY Rep"
sql_df = spark.sql(query)
sql_df.show()

+--------+----------+
|     Rep|sum(Units)|
+--------+----------+
|   Jones|       396|
| Jardine|       281|
|   Smith|       156|
| Sorvino|       142|
|  Kivell|       193|
|  Parent|       170|
| Andrews|       183|
|Thompson|        89|
|  Morgan|       173|
|    Gill|       213|
|  Howard|       125|
+--------+----------+



### UserDefinedFunctions

Let's say we need to perform an action on one of the columns - something similar to the functionality of RDD's map. We use `UserDefinedFunctions` to accomplish this. With a `UserDefinedFunction`, we pass in a DataFrame column and it transforms it.

Note that `udfs` are treated like a black box by Spark, meaning it doesn't try to optimitze them.

In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def multiplied(a, b):
    return a * b

multiplied_udf = udf(multiplied, FloatType())
multiplied_df = df.withColumn("Multiplied Total", multiplied_udf(df["Units"], df["Unit Cost"])).show()

+----------+-------+--------+-------+-----+---------+-------+----------------+
| OrderDate| Region|     Rep|   Item|Units|Unit Cost|  Total|Multiplied Total|
+----------+-------+--------+-------+-----+---------+-------+----------------+
|  1/6/2016|   East|   Jones| Pencil|   95|     1.99| 189.05|          189.05|
| 1/23/2016|Central|  Kivell| Binder|   50|    19.99|  999.5|           999.5|
|  2/9/2016|Central| Jardine| Pencil|   36|     4.99| 179.64|          179.64|
| 2/26/2016|Central|    Gill|    Pen|   27|    19.99| 539.73|          539.73|
| 3/15/2016|   West| Sorvino| Pencil|   56|     2.99| 167.44|          167.44|
|  4/1/2016|   East|   Jones| Binder|   60|     4.99|  299.4|           299.4|
| 4/18/2016|Central| Andrews| Pencil|   75|     1.99| 149.25|          149.25|
|  5/5/2016|Central| Jardine| Pencil|   90|     4.99|  449.1|           449.1|
| 5/22/2016|   West|Thompson| Pencil|   32|     1.99|  63.68|           63.68|
|  6/8/2016|   East|   Jones| Binder|   60|     8.99

## Where to go from here?

We've only scratched the surface of what Spark is capable of. I would turn to the Spark Programming Guide at http://spark.apache.org/docs/latest/programming-guide.html and review the guides for each of the libraries in Spark. To directly build upon what you've learned here, look into:

* Pseudo Set Operations like `rdd.distinct()`, `rdd.union()`, `rdd.intersection()`, etc.
* Additional transformations like `flatMap()` and `aggregate()`.