### Spark Session 3: Working with Spark SQL
### Thomas Cuna  |  MBD O2 
#### Concepts :

* Creating DataFrames from CSV input data format
* Performing basic data analysis using Spark SQL
* Saving a DataFrame into partitioned parquet files format

####  Dataset :

* Air flight data - subset of ~ 100 MB (for demonstration purposes)
* Available in the IE cluster @: /data/shared/spark/flight_data/csv_tiny

#### Basic Analysis:

1. Create a DataFrame by loading the input CSV dataset.
2. Report the name of the columns and the nb of rows in the dataset. 
3. Create a in-memory DataFrame and a permantely stored table from this DataFrame.
4. Report the top 10 airport with most departures in the dataset. Make both use of the DataFrame API and a direct SQL query for this.
5. Save a subset DataFrame that only contains carrier , airport and departure delays , partitioning the output by carrier and airport into parquet format.

#### Advanced :

1. What is the flight with the longest delay
2. Report the best , top 5 , carriers ( column carrier ) in terms of smallest average departure delay on all airports.   Consider a flight delayed that one where depdelay > 0 min
3. Which destinations are most likely to get delays from JFK?



In [35]:
# You need this to run Spark in the cluster
import findspark
findspark.init()
import pyspark

### Compulsory Part

1) Create a DataFrame by loading the input CSV dataset.

In [36]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[1]") \
    .appName("Spark-SQL-Assignment3") \
    .getOrCreate()

In [37]:
dataset_path="/data/shared/spark/flight_data/csv_tiny/"

In [38]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("file://"+dataset_path+"*.csv")

In [15]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: timestamp (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- AirlineID: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- OriginAirportSeqID: integer (nullable = true)
 |-- OriginCityMarketID: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- OriginState: string (nullable = true)
 |-- OriginStateFips: integer (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- OriginWac: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DestAirportSeqID: integer (nullable = true)
 |-- DestCityMarketID: integer (nu

2) Report the name of the columns and the nb of rows in the dataset.

Names of the columns

In [17]:
df.columns

['Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'FlightDate',
 'UniqueCarrier',
 'AirlineID',
 'Carrier',
 'TailNum',
 'FlightNum',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'Origin',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'Dest',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'CRSDepTime',
 'DepTime',
 'DepDelay',
 'DepDelayMinutes',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRSArrTime',
 'ArrTime',
 'ArrDelay',
 'ArrDelayMinutes',
 'ArrDel15',
 'ArrivalDelayGroups',
 'ArrTimeBlk',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'AirTime',
 'Flights',
 'Distance',
 'DistanceGroup',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'FirstDepTime',
 'TotalAddGTime',
 

Number of rows

In [19]:
total=df.count()
print('Total number of rows: %d' % total)

Total number of rows: 469489


Or in SQL:

In [61]:
df.registerTempTable("flights")

In [62]:
spark.sql("select COUNT(*) from flights").show()

+--------+
|count(1)|
+--------+
|  469489|
+--------+



3) Create a in-memory DataFrame and a permantely stored table from this DataFrame.

In [66]:
df.registerTempTable("flights")

In [67]:
df.write.saveAsTable("flights_perm")

4) Report the top 10 airport with most departures in the dataset. Make both use of the DataFrame API and a direct SQL query for this.

In [68]:
#DataFrame API
df.select('origin').groupby('origin').count().orderBy('count', ascending=False).limit(10).show()

+------+-----+
|origin|count|
+------+-----+
|   ATL|30196|
|   ORD|24870|
|   DFW|23025|
|   DEN|18935|
|   LAX|17589|
|   SFO|13878|
|   IAH|13496|
|   PHX|12126|
|   LAS|11231|
|   SEA| 9316|
+------+-----+



In [69]:
#SQL query
spark.sql("SELECT origin, count(origin) FROM flights GROUP BY origin order by count(origin) desc limit 10").show()

+------+-------------+
|origin|count(origin)|
+------+-------------+
|   ATL|        30196|
|   ORD|        24870|
|   DFW|        23025|
|   DEN|        18935|
|   LAX|        17589|
|   SFO|        13878|
|   IAH|        13496|
|   PHX|        12126|
|   LAS|        11231|
|   SEA|         9316|
+------+-------------+



5) Save a subset DataFrame that only contains carrier , airport and departure delays , partitioning the output by carrier and airport into parquet format.

In [70]:
df_subset=spark.sql(
    "select " 
    +"carrier, origin, depdelay "
    +"from flights"
    )
df_subset.show()

+-------+------+--------+
|carrier|origin|depdelay|
+-------+------+--------+
|     AA|   JFK|    -9.0|
|     AA|   JFK|     2.0|
|     AA|   JFK|   -11.0|
|     AA|   JFK|    -8.0|
|     AA|   JFK|    -7.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -2.0|
|     AA|   JFK|    -2.0|
|     AA|   JFK|    -9.0|
|     AA|   JFK|   -12.0|
|     AA|   JFK|    -1.0|
|     AA|   JFK|    -6.0|
|     AA|   JFK|    -7.0|
|     AA|   JFK|    -9.0|
|     AA|   JFK|    -6.0|
|     AA|   JFK|    -5.0|
|     AA|   JFK|    -5.0|
|     AA|   JFK|    -8.0|
+-------+------+--------+
only showing top 20 rows



In [77]:
df_subset.write.saveAsTable("flights_subset")

Alternative solution:

In [73]:
import os
print(os.environ['SPARK_HOME'])

/usr/hdp/current/spark2-client


In [76]:
my_home=os.environ['HOME']
out_dir="flights_subset"
subset.write.partitionBy("carrier","origin").parquet(
        "file://"
        + my_home
        +'/'
        + out_dir,
        mode='overwrite'
    )
print('Completed')

Completed


### Bonus Part

1) What is the flight with the longest delay

In [53]:
spark.sql("select flightnum, depdelay as delay from flights where depdelay=(select max(depdelay) from flights)").show()

+---------+------+
|flightnum| delay|
+---------+------+
|      290|1727.0|
+---------+------+



2) Report the best , top 5 , carriers ( column carrier ) in terms of smallest average departure delay on all airports. Consider a flight delayed that one where depdelay > 0 min

In [52]:
spark.sql("select carrier, round(avg(depdelay),4) as delay from flights where depdelay > 0 group by carrier order by avg(depdelay) asc limit(5)").show()

+-------+-------+
|carrier|  delay|
+-------+-------+
|     FL| 22.204|
|     WN|24.8247|
|     DL|25.0773|
|     US|26.3578|
|     AS|26.5338|
+-------+-------+



3) Which destinations are most likely to get delays from JFK?

In [60]:
spark.sql("select dest, round(avg(depdelay),3) as avg_delay from flights where origin = 'JFK' group by dest order by avg(depdelay) desc limit 10").show()

+----+---------+
|dest|avg_delay|
+----+---------+
| DTW|   26.733|
| IND|   22.167|
| PIT|     12.0|
| IAD|   11.616|
| ORD|   11.613|
| SAT|     10.5|
| ATL|    9.333|
| BWI|    8.633|
| PDX|    8.371|
| CLT|    6.885|
+----+---------+

