# RDDs, Dataframes, and Datasets

## RDDs

Resilient Distributed Datasets (We talked about these!). A new range of API's has been introduced to let people take advantage of Spark's parallel execution framework and fault tolerance without making the same set of mistakes.

## Dataframes

- RDD's with named *untyped* columns.
- Columnar storage
  - Similar optimizations for OLAP queries as vertica
- Memory Management (Tungsten)
  - direct control of data storage in memory
    - cpu cache, and read ahead
  - largest source of performance increase
- avoids java serialization (or other not as slow but still slow serialization)
  - Kryo serialization
  - compression
- no garbage collection overhead
- Execution plans (Catalyst Optimizer)
  - rule based instead of cost-based optimizer

## Datasets

adds to Dataframes
- compile time safety
- API only available through the scala (python has no type safety)

Encoders act as liason between JVM object and off-heap memory (the new formats introduced with Tungsten)

## Let's load a file

1. select 'Tables'
2. in new tab, select 'Create Table'
3. we could really select anything (from file upload, s3, DBFS, or JDBC) here but for now we will upload the 'mallard.csv' from the vertica demo
  (https://s3-us-west-2.amazonaws.com/cse599c-sp17/mallard.csv)
4. select preview table
5. we can name the table, select our file delimiter, etc.
6. retrieve the DBFS path befor
7. select 'create table'

In [4]:
# set file path for mallard.csv import
#mallardFilePath = '/FileStore/tables/<uuid>/mallard.csv'
mallardFilePath = '/FileStore/tables/7n7zqjcx1492482730270/mallard.csv'
#mallardFilePath = ''

# mallard.csv header
# event-id,timestamp,location-long,location-lat

In [5]:
# Import raw rdd convert to dataframe
from pyspark.sql import types

# Create RDD from csv file
# pysqark RDD documentation: http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD
mallard_rdd = sc.textFile(mallardFilePath)

mallard_rdd.take(10)
mallard_rdd.count()

In [6]:
def rdd_head(rdd,n=3):
  displayHTML("Count: {} <br> {}".format(rdd.count(), "<br>".join(str(row) for row in rdd.take(n))) )

rdd_head(mallard_rdd)

In [7]:
#Skip header row
header_row = mallard_rdd.first()
mallard_rdd = mallard_rdd.filter(lambda row : not row.startswith('event-id'))
rdd_head(mallard_rdd)


In [8]:
#Split rows and convert
from datetime import datetime

def make_row(row):
  row = row.split(',')
  return ( int(row[0]), datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S.000') , float(row[2]) , float(row[3]) )

mallard_rdd = mallard_rdd.map(make_row)

rdd_head(mallard_rdd)

#Question: What day had the most sightings?

In [10]:
# Use groupBy and map
mallard_rdd_days = mallard_rdd.groupBy( lambda row: str(row[1].date()) ).map(lambda row: (row[0],len(row[1])) )

rdd_head(mallard_rdd_days)

In [11]:
# Sort by count descending
mallard_rdd_days_descending = mallard_rdd_days.sortBy( lambda row: row[1], ascending=False)
rdd_head(mallard_rdd_days_descending)

In [12]:
# pyspark Dataframe documentation: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame
mallard_dataframe = mallard_rdd.toDF()
mallard_dataframe.printSchema()
mallard_dataframe.count()

In [13]:
# Make schema for mallard dataframe
from datetime import datetime

header_row = header_row.split(',')
def row_func(row):
  return { header : obj for header , obj in zip(header_row,row) }
    

mallard_dataframe = mallard_rdd.map(row_func).toDF()
mallard_dataframe.printSchema()
mallard_dataframe.head(10)

In [14]:
mallard = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true', delimiter=',').load(mallardFilePath)
display(mallard)

In [15]:
sortMallard = mallard.sort("location-long","location-lat")
display(sortMallard)

#Question: What day had the most sightings?

In [17]:
#pyspark sql functions: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions
# offers programatic SQL type commands
import pyspark.sql.functions as sf

mallard_days = mallard \
  .select(sf.col('timestamp').cast('date').alias('date')) \
  .groupBy('date') \
  .agg(sf.count("date").alias("count")) \
  .sort(sf.col("count").desc())

display(mallard_days)

In [18]:
# A more complicated example to show the expressiveness of pysqark 
# Calculate number of sightings in 1.1km squares (~= 1 decimal places of latitude and longitude )

countOneKM = mallard.withColumn("round-long",sf.round("location-long",1)) \
  .withColumn("round-lat",sf.round("location-lat",1))\
  .select("round-long","round-lat") \
  .groupBy("round-long", "round-lat") \
  .agg(sf.count("round-long").alias('location-bin')) \
  .sort(sf.col("location-bin").desc())
display(countOneKM)

# Use Case: On-Time Flight Performance

This notebook provides an analysis of On-Time Flight Performance and Departure Delays

Source Data: 
* [OpenFlights: Airport, airline and route data](http://openflights.org/data.html)
* [United States Department of Transportation: Bureau of Transportation Statistics (TranStats)](http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time)
 * Note, the data used here was extracted from the US DOT:BTS between 1/1/2014 and 3/31/2014*

References:
* [GraphFrames User Guide](http://graphframes.github.io/user-guide.html)
* [GraphFrames: DataFrame-based Graphs (GitHub)](https://github.com/graphframes/graphframes)
* [D3 Airports Example](http://mbostock.github.io/d3/talk/20111116/airports.html)

### Preparation
Extract the Airports and Departure Delays information from S3 / DBFS

In [21]:
# Set File Paths
tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

In [22]:
# Obtain airports dataset
airportsna = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true', delimiter='\t').load(airportsnaFilePath)
airportsna.registerTempTable("airports_na")

In [23]:
# Obtain departure Delays data
departureDelays = sqlContext.read.format("com.databricks.spark.csv").options(header='true').load(tripdelaysFilePath)
departureDelays.registerTempTable("departureDelays")
departureDelays.cache()

In [24]:
airportsna.printSchema()

In [25]:
departureDelays.printSchema()

In [26]:
# use pysqark.sql.functions which is already imported as sf
sortDelays = departureDelays.sort("delay")
sortDelays.head(3)

In [27]:
# register the DataFrame as a temp table so that we can query it using SQL language
departureDelays.registerTempTable("depature_delays")
sortDelays_sql = sqlContext.sql("SELECT * FROM depature_delays ORDER BY delay")
sortDelays_sql.head(3)

In [28]:
# We can also do more complex selections
longAvgDistByDest = departureDelays.groupBy("destination")\
  .agg(sf.avg("distance").alias("avg_dist"))\
  .where("avg_dist > 1000")
longAvgDistByDest.head(3)

In [29]:
# and again with a declarative command
longAvgDistByDest_sql = sqlContext.sql("SELECT destination, avg(distance) AS avg_dist FROM depature_delays GROUP BY destination HAVING avg_dist > 1000")
longAvgDistByDest_sql.head(3)

In [30]:
# we can also use the python fluent API to execute joins
from pyspark.sql.functions import col
delayedSeaDest = departureDelays.join(airportsna, departureDelays["destination"] == airportsna["IATA"], 'inner')\
  .filter(col("origin") == 'SEA')\
  .groupBy("destination")\
  .agg(sf.avg("delay").alias("avg_delay"))\
  .orderBy(sf.col("avg_delay").desc())
delayedSeaDest.head(3)

In [31]:
# and again with a declarative command
airportsna.registerTempTable("airports")
delayedSeaDest_sql = sqlContext.sql("SELECT destination, avg(delay) AS avg_delay FROM depature_delays, airports WHERE origin = 'SEA' AND destination = IATA GROUP BY destination ORDER BY -avg_delay")
delayedSeaDest_sql.head(3)

In [32]:
delayedSeaDest.explain()

In [33]:
delayedSeaDest_sql.explain()