<DIV ALIGN=CENTER>

# Introduction to Spark: DataFrames
## Professor Robert J. Brunner
  
</DIV>  
-----
-----

## Introduction

In this IPython Notebook, we explore using Spark to perform data processing in a similar manner to our previous efforts with Pandas. For this we will use the airline data, which has been stored within a filesystem that is accessible from within our Spark cluster. We first initialize our spark environment, afterwhich we will process data within SPark by using a Spark DataFrame.

-----

In [1]:
# We release the SparkContext if it exists.
try:
    sc
except:
    pass ;
else:
    sc.stop()

# Now handle initial import statements
from pyspark import SparkConf, SparkContext

# Create new Spark Configuration 
myconf = SparkConf()
myconf.setMaster('local[*]')
myconf.setAppName("INFO490 SP17 W14-NB2: Professor Brunner")
myconf.set('spark.executor.memory', '1g')

# Create and initialize a new Spark Context
sc = SparkContext(conf=myconf)

# Display Spark version information, which also verifies SparkContext is active
print("\nSpark version: {0}".format(sc.version))


Spark version: 2.0.1


-----

### Data Processing

In this Notebook, we will need sample data. To simplify acquiring data
to demonstrate using Spark DataFrames, we include the RDD code from the
[Introduction to Spark](intro2spark.ipynb) Notebook in the following
cell.

-----

In [2]:
filename = '/home/data_scientist/data/2001/2001-1.csv'

text_file = sc.textFile(filename)

col_data = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[4], p[14], p[15], p[16], p[17], p[18])) \
            .filter(lambda line: 'Year' not in line)

cols = col_data.filter(lambda line: 'NA' not in line)

fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), int(p[3]),
                          int(p[4]), int(p[5]), p[6], p[7], int(p[8])))

# Should be 480106 if everything works correctly
print('Number of entries in fields RDD = {0}'.format(fields.count()))

Number of entries in fields RDD = 480106


-----

## Spark DataFrame

Spark supports a simplified [Data Frame][spdf] as part of the [Spark
SQL][spsql] library. We can create a Data Frame from an existing RDD by
also specifying the column labels and data types. The data types must
be one of the pre-defined [Spark SQL types][spdt]. After creating the
new DataFrame (which is backed by an RDD), we can perform many of the
same tasks with Spark that we performed with Pandas (but not all, and
not in as simple of an approach). The following code cells show how we
can take our 2001 flight data RDD and create a new Data Frame, which we
subsequently use in several subsequent code cells.

-----
[spdf]: https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes
[spsql]: https://spark.apache.org/sql/
[spdt]: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

In [3]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth DepTime ArrDelay DepDelay Origin Destination Distance"

fieldTypes = [IntegerType(), IntegerType(), IntegerType(), \
              IntegerType(), IntegerType(), IntegerType(), \
              StringType(), StringType(), IntegerType()]

f_data = [StructField(field_name, field_type, True) \
          for field_name, field_type in zip(schemaString.split(), fieldTypes)]

schema = StructType(f_data)

In [4]:
df = sqlContext.createDataFrame(fields, schema)
print(df)

DataFrame[Year: int, Month: int, DayOfMonth: int, DepTime: int, ArrDelay: int, DepDelay: int, Origin: string, Destination: string, Distance: int]


-----

In the following three code cells, we `show` the first few lines of the
DataFrame, then use the `head` method, which displays more syntactic
information for each row, and finally use the `describe` method, which
doesn't execute until the `show` action is invoked. While the output is
less visually attractive than the Pandas result, we still obtain the
necessary information.

After these code cells, we access the DataFrame schema, first by using
the `printSchema` method to nicely output the schema, and next access a
column directly, which we can now do since we have named our DataFrame
columns.

-----

In [5]:
df.show(5)

+----+-----+----------+-------+--------+--------+------+-----------+--------+
|Year|Month|DayOfMonth|DepTime|ArrDelay|DepDelay|Origin|Destination|Distance|
+----+-----+----------+-------+--------+--------+------+-----------+--------+
|2001|    1|        17|   1806|      -3|      -4|   BWI|        CLT|     361|
|2001|    1|        18|   1805|       4|      -5|   BWI|        CLT|     361|
|2001|    1|        19|   1821|      23|      11|   BWI|        CLT|     361|
|2001|    1|        20|   1807|      10|      -3|   BWI|        CLT|     361|
|2001|    1|        21|   1810|      20|       0|   BWI|        CLT|     361|
+----+-----+----------+-------+--------+--------+------+-----------+--------+
only showing top 5 rows



In [6]:
df.head(4)

[Row(Year=2001, Month=1, DayOfMonth=17, DepTime=1806, ArrDelay=-3, DepDelay=-4, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=18, DepTime=1805, ArrDelay=4, DepDelay=-5, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=19, DepTime=1821, ArrDelay=23, DepDelay=11, Origin='BWI', Destination='CLT', Distance=361),
 Row(Year=2001, Month=1, DayOfMonth=20, DepTime=1807, ArrDelay=10, DepDelay=-3, Origin='BWI', Destination='CLT', Distance=361)]

In [7]:
df.describe().show()

+-------+--------------------+------+-----------------+-----------------+-----------------+------------------+-----------------+
|summary|                Year| Month|       DayOfMonth|          DepTime|         ArrDelay|          DepDelay|         Distance|
+-------+--------------------+------+-----------------+-----------------+-----------------+------------------+-----------------+
|  count|              480106|480106|           480106|           480106|           480106|            480106|           480106|
|   mean|              2001.0|   1.0|16.01370530674476|1359.660206287778|6.382288494624103| 8.781523246949632|716.9933556339641|
| stddev|1.136732532560936...|   0.0|8.936964382456553|487.2369594358406|31.04865060768924|27.966300686761794|568.6557196351681|
|    min|                2001|     1|                1|                1|              -80|               -59|               21|
|    max|                2001|     1|               31|             2400|             1688|      

In [8]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Distance: integer (nullable = true)



In [9]:
df.Year

Column<b'Year'>

-----

We can extract data from the DataFrame by using similar techniques to
what we used with Pandas. One difference is that we need to `filter` the
DataFrame, as opposed to directly access rows. However, we can filter
rows to extract flights that left O'Hare, and secondly those flights
that left O'Hare more than two hours late. In the second case, we also
tranform the output to `select` the _Destination_ column and a new
column that is the _Distance_ in kilometers.

-----

In [10]:
df.filter(df['Origin'] == 'ORD').count()

27455

In [11]:
df.filter(df['Origin'] == 'ORD').filter(df['DepDelay'] > 120).select(df['Destination'], df['Distance'] * 1.6).show(10)

+-----------+-----------------+
|Destination| (Distance * 1.6)|
+-----------+-----------------+
|        PHL|           1084.8|
|        CLT|958.4000000000001|
|        MEM|            785.6|
|        MEM|            785.6|
|        MEM|            785.6|
|        STL|            412.8|
|        STL|            412.8|
|        PVD|           1358.4|
|        LAX|           2792.0|
|        LAX|           2792.0|
+-----------+-----------------+
only showing top 10 rows



-----

## Spark SQL

Given a Spark DataFrame, we can apply SQL statements directly against
the DataFrame by registering the DataFrame as a Spark temporary SQL
table. The following code cells demonstrates this, as we register our
DataFrame as a `flights` table, and execute a SQL statement to select
the same data we obtained from our previous DataFrame filter.Since the
data are unordered, we have different results displayed via the `show`
method.

-----

In [12]:
df = sqlContext.createDataFrame(fields, schema)

df.registerTempTable("flights")

# SQL can be run over DataFrames that have been registered as a table.
sql_q = "SELECT Destination, Distance FROM flights WHERE Origin = 'ORD' AND DepDelay > 120"

results = sqlContext.sql(sql_q)

# The results of SQL queries are RDDs and support all the normal RDD operations.
results.show(10)

+-----------+--------+
|Destination|Distance|
+-----------+--------+
|        PHL|     678|
|        CLT|     599|
|        MEM|     491|
|        MEM|     491|
|        MEM|     491|
|        STL|     258|
|        STL|     258|
|        PVD|     849|
|        LAX|    1745|
|        LAX|    1745|
+-----------+--------+
only showing top 10 rows



-----
### Student Activity

In the preceding cells, we introduced Spark DataFrames and Spark SQL. Now that you have run the Notebook, go back and make the following changes to see how the results change.

1. Change the DataFrame to include different columns from the flights data. You might review the original [airline data set](http://stat-computing.org/dataexpo/2009/) website to see the column descriptions.

2. Use a SQL query on the `df` DataFrame to compute the mean distance between all flights from O'Hare to Los Angeles International Airport (LAX).

4. Add an index column to this Spark DataFrame, which sequentially increases.

Additional, more advanced problems:

1. Using the Spark DataFrame, output the results of the `describe` function on all numeric columns.

2. Turn this Spark DataFrame into a Pandas DataFrame and make a regression plot of the arrival delay versus the departure delay by using Seaborn.

-----

### Ending the Spark Session

We must stop the `SparkContext` in order to release resources on the
instructional cluster before existing this Notebook.

-----

In [13]:
sc.stop()