### Introduction
Welcome to a super quick spark workshop. 
- First we will discuss how spark works internally
- Then we will play around with different api's spark provide using data I downloaded from US department of education website.

### RDD
- "Resilient Distributed Datasets" is the core abstraction in spark.
- When you read data from a source its loaded to cluster as an abstraction called RDD.
- RDD takes care of making sure that data is resilient and can be operated on in parallel.
- RDD defines 2 kinds of operations:
    - Transformations
    - Actions
- Transformations are lazy, that is transformation operation at rdd returns another rdd without actually doing anything!
- Actions cause change to happen and typically don't return RDD
- Also everything is preferably done in memory

### Under the hood
- Starting from RDD as you define operations on it, you are creating a DAG to execute on cluster
- Each transformation you define just defines a DAG, nothing more
- Action cause spark to execute DAG
- Everytime you do an action, complete DAG till that point is re-evaluated
- You have to explicitly cache things in memory/disk after transformation, if you don't want some heavy computation to re-evaluate.
- Spark distribute DAG into multiple stages. Each stage is group of task that cann happen together without shuffling data
- Shuffle is your dire enemy
- Each stage consists of as many tasks as no of partitions in RDD (draw some random garbage on board to impress audience at this point)


In [46]:
spark
from pyspark.sql.functions import desc

### RDD Exercise

In [35]:
input_dataframe = spark.read.csv(
    "./students.csv", header=True, mode="DROPMALFORMED"
)

In [36]:
input_rdd = input_dataframe.rdd

In [37]:
input_rdd.take(5)

[Row(UNITID='100654', OPEID='00100200', OPEID6='001002', INSTNM='Alabama A & M University', CITY='Normal', STABBR='AL', INSTURL='www.aamu.edu/', NPCURL='https://galileo.aamu.edu/NetPriceCalculator/npcalc.htm', HCM2='0', PREDDEG='3', HIGHDEG='4', CONTROL='1', LOCALE='12', HBCU='1', PBI='0', ANNHI='0', TRIBAL='0', AANAPII='0', HSI='0', NANTI='0', MENONLY='0', WOMENONLY='0', RELAFFIL='NULL', SATVR25='365', SATVR75='485', SATMT25='360', SATMT75='495', SATWR25='370', SATWR75='457', SATVRMID='425', SATMTMID='428', SATWRMID='414', ACTCM25='16', ACTCM75='19', ACTEN25='14', ACTEN75='20', ACTMT25='15', ACTMT75='18', ACTWR25='NULL', ACTWR75='NULL', ACTCMMID='18', ACTENMID='17', ACTMTMID='17', ACTWRMID='NULL', SAT_AVG='929', SAT_AVG_ALL='929', PCIP01='0.0375', PCIP03='0.0177', PCIP04='0.0088', PCIP05='0', PCIP09='0', PCIP10='0.0442', PCIP11='0.0353', PCIP12='0', PCIP13='0.1192', PCIP14='0.0751', PCIP15='0.0287', PCIP16='0', PCIP19='0.0199', PCIP22='0', PCIP23='0.0132', PCIP24='0.0618', PCIP25='0',

In [38]:
city_count = input_rdd
    .map(lambda row: (row['CITY'], 1))
    .reduceByKey(lambda count1, count2: count1 + count2)
    .sortBy(lambda row: -row[1])

In [39]:
city_count.take(10)

[('New York', 83),
 ('Chicago', 73),
 ('Houston', 66),
 ('Los Angeles', 53),
 ('Brooklyn', 49),
 ('Miami', 47),
 ('San Antonio', 45),
 ('Philadelphia', 40),
 ('Atlanta', 38),
 ('Dallas', 37)]

### Spark SQL

In [43]:
input_dataframe.printSchema()

root
 |-- UNITID: string (nullable = true)
 |-- OPEID: string (nullable = true)
 |-- OPEID6: string (nullable = true)
 |-- INSTNM: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STABBR: string (nullable = true)
 |-- INSTURL: string (nullable = true)
 |-- NPCURL: string (nullable = true)
 |-- HCM2: string (nullable = true)
 |-- PREDDEG: string (nullable = true)
 |-- HIGHDEG: string (nullable = true)
 |-- CONTROL: string (nullable = true)
 |-- LOCALE: string (nullable = true)
 |-- HBCU: string (nullable = true)
 |-- PBI: string (nullable = true)
 |-- ANNHI: string (nullable = true)
 |-- TRIBAL: string (nullable = true)
 |-- AANAPII: string (nullable = true)
 |-- HSI: string (nullable = true)
 |-- NANTI: string (nullable = true)
 |-- MENONLY: string (nullable = true)
 |-- WOMENONLY: string (nullable = true)
 |-- RELAFFIL: string (nullable = true)
 |-- SATVR25: string (nullable = true)
 |-- SATVR75: string (nullable = true)
 |-- SATMT25: string (nullable = true)
 |-- SAT

In [47]:
input_dataframe.groupBy('CITY').count().sort(desc('count')).show()

+------------+-----+
|        CITY|count|
+------------+-----+
|    New York|   83|
|     Chicago|   73|
|     Houston|   66|
| Los Angeles|   53|
|    Brooklyn|   49|
|       Miami|   47|
| San Antonio|   45|
|Philadelphia|   40|
|     Atlanta|   38|
|      Dallas|   37|
|    Columbus|   35|
|   San Diego|   35|
|    Portland|   34|
|Indianapolis|   34|
| Springfield|   33|
|     Phoenix|   33|
|Jacksonville|   33|
|     Memphis|   32|
|  Pittsburgh|   31|
|      Boston|   31|
+------------+-----+
only showing top 20 rows



In [52]:
input_dataframe.createOrReplaceTempView("students")
spark.sql('SELECT CITY, COUNT(CITY) as count FROM students GROUP BY CITY ORDER BY count DESC').show()

+------------+-----+
|        CITY|count|
+------------+-----+
|    New York|   83|
|     Chicago|   73|
|     Houston|   66|
| Los Angeles|   53|
|    Brooklyn|   49|
|       Miami|   47|
| San Antonio|   45|
|Philadelphia|   40|
|     Atlanta|   38|
|      Dallas|   37|
|   San Diego|   35|
|    Columbus|   35|
|    Portland|   34|
|Indianapolis|   34|
| Springfield|   33|
|     Phoenix|   33|
|Jacksonville|   33|
|     Memphis|   32|
|  Pittsburgh|   31|
|  Washington|   31|
+------------+-----+
only showing top 20 rows

