# Exploring Data with DataFrames and Spark SQL
In this case, I will explore data using the Spark DataFrames API and Spark SQL.

## Load Data Using an Explicit Schema
To explore data, I must load it into a programmatic data object such as a DataFrame. If the structure of the data is known ahead of time, I can explicitly specify the schema for the DataFrame.
In this exercise, I will work with data that records details of flights.


In [3]:
from pyspark.sql.types import *

In [4]:
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

In [5]:
flights = spark.read.csv('/Users/EDHLINA/Downloads/DAT202/data/raw-flight-data.csv', schema=flightSchema, header=True)

In [6]:
flights.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

## Infer a Data Schema
If the structure of the data source is unknown, you can have Spark auomatically infer the schema.
In this case, I will load data about airports without knowing the schema.


In [10]:
airports = spark.read.csv('/Users/EDHLINA/Downloads/DAT202/data/airports.csv', inferSchema=True, header=True)
airports.show()


+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

## Use DataFrame Methods
Spark DataFrames provide functions that you can use to extract and manipulate data. For example, you can use the select function to return a new DataFrame containing columns selected from an existing DataFrame.

In [11]:
cities = airports.select("city", "name")
cities.show()

+-----------+--------------------+
|       city|                name|
+-----------+--------------------+
|Adak Island|                Adak|
|  Anchorage|Ted Stevens Ancho...|
|      Aniak|       Aniak Airport|
|     Barrow|Wiley Post/Will R...|
|     Bethel|      Bethel Airport|
|    Cordova|Merle K Mudhole S...|
|  Deadhorse|   Deadhorse Airport|
| Dillingham|  Dillingham Airport|
|  Fairbanks|Fairbanks Interna...|
|   Gustavus|    Gustavus Airport|
|     Juneau|Juneau International|
|  Ketchikan|Ketchikan Interna...|
|King Salmon| King Salmon Airport|
|     Kodiak|      Kodiak Airport|
|   Kotzebue| Ralph Wien Memorial|
|       Nome|        Nome Airport|
| Petersburg|Petersburg James ...|
|      Sitka|Sitka Rocky Gutie...|
| St. Mary's|  St. Mary's Airport|
|   Unalaska|    Unalaska Airport|
+-----------+--------------------+
only showing top 20 rows



## Combine Operations
You can combine functions in a single statement to perform multiple operations on a DataFrame. In this case, I will use the **join** function to combine the **flights** and **airports** DataFrames, and then use the **groupBy** and **count** functions to return the number of flights from each airport.

In [12]:
flightsByOrigin = flights.join(airports, flights.OriginAirportID == airports.airport_id).groupBy("city").count()
flightsByOrigin.show()

+-----------------+------+
|             city| count|
+-----------------+------+
|          Phoenix| 90281|
|            Omaha| 13537|
|   Raleigh/Durham| 28436|
|        Anchorage|  7777|
|           Dallas| 19503|
|          Oakland| 25503|
|      San Antonio| 23090|
|     Philadelphia| 47659|
|       Louisville| 10953|
|Dallas/Fort Worth|105024|
|      Los Angeles|118684|
|       Sacramento| 25193|
|     Indianapolis| 18099|
|        Cleveland| 25261|
|        San Diego| 45783|
|    San Francisco| 84675|
|        Nashville| 34927|
|    Oklahoma City| 13967|
|          Detroit| 62879|
|         Portland| 30640|
+-----------------+------+
only showing top 20 rows



## Count the Rows in a DataFrame
Now that you're familiar with working with DataFrames, a key task when building predictive solutions is to explore the data, determing statistics that will help you understand the data before building predictive models. For example, how many rows of flight data do you actually have?

In [13]:
flights.count()


2719418

## Determine Summary Statistics
Predictive modeling is based on statistics and probability, so you will often start by looking at summary statistics. The **describe** function returns a DataFrame containing the **count**, **mean**, **standard deviation**, **minimum**, and **maximum** values for each numeric column.

In [14]:
flights.describe().show()


+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|summary|       DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|         DepDelay|         ArrDelay|
+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|  count|          2719418|           2719418|2719418|           2719418|           2719418|          2691974|          2690385|
|   mean|15.79747468024408|3.8983907586108497|   null| 12742.26441172339|12742.455345592329|10.53686662649788| 6.63768791455498|
| stddev|8.799860168985367|1.9859881390373617|   null|1501.9729397025808|1501.9692528927785|36.09952806643081|38.64881489390021|
|    min|                1|                 1|     9E|             10140|             10140|              -63|              -94|
|    max|               31|                 7|     YV|             15376|             15376|     

## Determine the Presence of Duplicates
The data I have to work with won't always be perfect - often I'll want to *clean* the data; for example to detect and remove duplicates that might affect my model. I can use the **dropDuplicates** function to create a new DataFrame with the duplicates removed, enabling me to determine how many rows are duplicates of other rows.

In [15]:
flights.count() - flights.dropDuplicates().count()

22435

## Identify Missing Values
As well as determing if duplicates exist in my data, I should detect missing values, and either remove rows containing missing data or replace the missing values with a suitable relacement. The **dropna** function creates a DataFrame with any rows containing missing data removed - I can specify a subset of columns, and whether the row should be removed in *any* or *all* values are missing. I can then use this new DataFrame to determine how many rows contain missing values.

In [16]:
flights.count() - flights.dropDuplicates().dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()

46233

## Clean the Data
Now that I've identified that there are duplicates and missing values, I can clean the data by removing the duplicates and replacing the missing values. The **fillna** function replaces missing values with a specified replacement value. In this case, I'll remove all duplicate rows and replace missing **ArrDelay** and **DepDelay** values with **0**.

In [17]:
data=flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"])
data.count()

2696983

## Check Summary Statistics
After cleaning the data, you should re-check the statistics - removing rows and changing values may affect the distribution of the data, which in turn could affect any predictive models you might create.

In [18]:
data.describe().show()

+-------+------------------+------------------+-------+------------------+-----------------+------------------+------------------+
|summary|        DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|    DestAirportID|          DepDelay|          ArrDelay|
+-------+------------------+------------------+-------+------------------+-----------------+------------------+------------------+
|  count|           2696983|           2696983|2696983|           2696983|          2696983|           2696983|           2696983|
|   mean|15.798996508320593| 3.900369412784582|   null|12742.459424846207|12742.85937657004|10.531134234068217|6.6679285705545785|
| stddev|  8.80126719913545|1.9864582421701973|   null|1502.0359941370602|1501.993958981798| 36.06172819056574| 38.58386147358071|
|    min|                 1|                 1|     9E|             10140|            10140|               -63|               -94|
|    max|                31|                 7|     YV|             15376|         

## Explore Relationships in the Data
Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another and identify any apparent correlation. The **corr** function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, whith a string negative correlation (near -1) indicates that *low* values for one column are often found with *high* values for the other. A correlation near 0 indicates little apparent relationship between the fields.

In [19]:
data.corr("DepDelay", "ArrDelay")

0.9392630367706958

## Use Spark SQL
In addition to using the DataFrame API directly to query data, I can persist DataFrames as table and use Spark SQL to query them using the SQL language. SQL is often more intuitive to use when querying tabular data structures.

In [20]:
data.createOrReplaceTempView("flightData")

spark.sql("SELECT DayOfWeek, AVG(ArrDelay) AS AvgDelay FROM flightData GROUP BY DayOfWeek ORDER BY DayOfWeek").show()

+---------+------------------+
|DayOfWeek|          AvgDelay|
+---------+------------------+
|        1| 7.077989660973244|
|        2|  4.39237404158651|
|        3| 7.234625279280266|
|        4|10.775574715480056|
|        5|  8.71110560964396|
|        6|2.1437428120738304|
|        7|  5.25403935972552|
+---------+------------------+



In [21]:
spark.sql("SELECT DepDelay, ArrDelay FROM flightData").show()


+--------+--------+
|DepDelay|ArrDelay|
+--------+--------+
|      -5|     -16|
|      -8|      -4|
|      -4|       2|
|      -1|     -23|
|      -1|     -11|
|     109|     106|
|       0|     -11|
|       3|      -1|
|       1|     -12|
|       2|     -12|
|      -3|     -10|
|      15|      11|
|       0|     -19|
|       6|      -8|
|      -2|     -12|
|       1|      61|
|     140|     130|
|      27|      13|
|       5|       4|
|      -9|     -18|
+--------+--------+
only showing top 20 rows



In [23]:
airports.createOrReplaceTempView("airportData")

In [24]:
spark.sql("SELECT a.name, AVG(f.ArrDelay) AS avgdelay FROM flightData AS f JOIN airportData AS a ON f.DestAirportID = a.airport_id GROUP BY a.name").show()

+--------------------+-------------------+
|                name|           avgdelay|
+--------------------+-------------------+
|     Eppley Airfield|  9.023493663381013|
|     Kahului Airport| 4.4546988807958785|
|San Diego Interna...|  5.773734422383071|
|            Bob Hope|    4.4999646568177|
|Hartsfield-Jackso...|  7.278792574874507|
|Sacramento Intern...| 5.8097549235891055|
|Chicago O'Hare In...|  9.796627713118575|
|   Will Rogers World| 10.492794350771005|
|Ted Stevens Ancho...|-1.3845654993514915|
|Raleigh-Durham In...|  6.549666761202496|
|Minneapolis-St Pa...|  2.910011511319464|
|Metropolitan Oakl...|   5.14337972166998|
|Norman Y. Mineta ...|  5.055079508939606|
|Southwest Florida...| 4.0729083025533965|
|San Antonio Inter...|  8.470013521175906|
|Cincinnati/Northe...|  4.714566846711049|
|           LaGuardia| 11.209838721036693|
|     William P Hobby|   7.54095066518847|
|Philadelphia Inte...|  8.517859028789895|
| Miami International| 3.5782458645721764|
+----------

In [25]:
data.registerTempTable("flightData2")

In [27]:
spark.sql("SELECT * FROM flightData2").show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|         1|        1|     9E|          13487|        11042|      -5|     -16|
|         1|        1|     9E|          14122|        11193|      -8|      -4|
|         1|        1|     9E|          14524|        11433|      -4|       2|
|         1|        1|     AA|          10821|        11298|      -1|     -23|
|         1|        1|     AA|          11042|        11298|      -1|     -11|
|         1|        1|     AA|          11298|        10423|     109|     106|
|         1|        1|     AA|          11298|        13495|       0|     -11|
|         1|        1|     AA|          11298|        14057|       3|      -1|
|         1|        1|     AA|          11298|        14679|       1|     -12|
|         1|        1|     AA|          11298|      