## SparkSQL

In [1]:
# Define the SPARK_HOME variable
import os
print(os.environ['SPARK_HOME'])

/usr/hdp/current/spark2-client


In [2]:
# Use findspark to run Spark in the cluster
import findspark
findspark.init()
import pyspark

In [3]:
dataset_path="/data/shared/spark/flight_data/csv_tiny/"

In [4]:
# Create a SparkSession and specify the configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("Spark-SQL") \
        .getOrCreate()

Create a dataframe by loading the input csv dataset

In [5]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("file://"+dataset_path+"*.csv")

Get the names of the columns and the number of rows in the dataset

In [6]:
# Get the names of the columns in the dataset
col_names = df.columns
col_names

['Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'FlightDate',
 'UniqueCarrier',
 'AirlineID',
 'Carrier',
 'TailNum',
 'FlightNum',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'Origin',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'Dest',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'CRSDepTime',
 'DepTime',
 'DepDelay',
 'DepDelayMinutes',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRSArrTime',
 'ArrTime',
 'ArrDelay',
 'ArrDelayMinutes',
 'ArrDel15',
 'ArrivalDelayGroups',
 'ArrTimeBlk',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'AirTime',
 'Flights',
 'Distance',
 'DistanceGroup',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'FirstDepTime',
 'TotalAddGTime',
 

In [7]:
# Get the number of rows in the dataset
numb_row = df.count()
numb_row

469489

Create an in-memory dataframe and a permanently-stored table from this dataframe

In [8]:
# Create an in-memory dataframe 
df.registerTempTable("flights")

In [10]:
# Create a permanently-stored table from this dataframe
df.write.saveAsTable("flights_perm")

Get the top 10 airports with the greatest number of departures in the dataset

In [11]:
# Using the dataframe API
dep_count = df.select("origin").groupBy("origin").count().orderBy("count",ascending=False).limit(10).show()

+------+-----+
|origin|count|
+------+-----+
|   ATL|30196|
|   ORD|24870|
|   DFW|23025|
|   DEN|18935|
|   LAX|17589|
|   SFO|13878|
|   IAH|13496|
|   PHX|12126|
|   LAS|11231|
|   SEA| 9316|
+------+-----+



In [12]:
# Using a direct SQL query
query = "SELECT origin,count(origin) AS numb_dep FROM flights GROUP BY origin ORDER BY count(origin) DESC LIMIT 10"
df_numb_dep = spark.sql(query)
df_numb_dep.show()

+------+--------+
|origin|numb_dep|
+------+--------+
|   ATL|   30196|
|   ORD|   24870|
|   DFW|   23025|
|   DEN|   18935|
|   LAX|   17589|
|   SFO|   13878|
|   IAH|   13496|
|   PHX|   12126|
|   LAS|   11231|
|   SEA|    9316|
+------+--------+



Save a subset dataframe that only contains carrier, airport, and departure delays, partitioning the output by carrier and airport into parquet format

In [22]:
# Create a subset dataframe that only contains carrier, airport (origin), and departure delays (depdelay)
df_subset = spark.sql("SELECT carrier,origin,depdelay FROM flights")
df_subset.show()

+-------+------+--------+
|carrier|origin|depdelay|
+-------+------+--------+
|     AA|   JFK|    -9.0|
|     AA|   JFK|     2.0|
|     AA|   JFK|   -11.0|
|     AA|   JFK|    -8.0|
|     AA|   JFK|    -7.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -4.0|
|     AA|   JFK|    -2.0|
|     AA|   JFK|    -2.0|
|     AA|   JFK|    -9.0|
|     AA|   JFK|   -12.0|
|     AA|   JFK|    -1.0|
|     AA|   JFK|    -6.0|
|     AA|   JFK|    -7.0|
|     AA|   JFK|    -9.0|
|     AA|   JFK|    -6.0|
|     AA|   JFK|    -5.0|
|     AA|   JFK|    -5.0|
|     AA|   JFK|    -8.0|
+-------+------+--------+
only showing top 20 rows



In [14]:
# Partition the output by carrier and airport (origin) into parquet format
my_home = os.environ['HOME']
out_dir = "flight_data"
df_subset.write.partitionBy("carrier","origin").parquet(
        "file://"
        +my_home
        +'/'
        +out_dir,
        mode = 'overwrite')

Which flight had the longest delay?

In [15]:
max_delay = "SELECT flightnum AS flight_number,depdelay AS departure_delay FROM flights where depdelay=(SELECT max(depdelay) FROM flights)"
df_max_delay = spark.sql(max_delay)
df_max_delay.show()

+-------------+---------------+
|flight_number|departure_delay|
+-------------+---------------+
|          290|         1727.0|
+-------------+---------------+



Get the top 5 carriers in terms of the smallest average departure delay in all airports. We consider that a flight is delayed when depdelay > 0 min

In [18]:
avg_delay = "SELECT carrier,ROUND(avg(depdelay),2) AS avg_delay FROM flights WHERE depdelay>0 GROUP BY carrier ORDER BY avg_delay ASC LIMIT 5"
df_avg_delay = spark.sql(avg_delay)
df_avg_delay.show()

+-------+---------+
|carrier|avg_delay|
+-------+---------+
|     FL|     22.2|
|     WN|    24.82|
|     DL|    25.08|
|     US|    26.36|
|     AS|    26.53|
+-------+---------+



Which destinations are most likely to get delayed when the origin of the flight is JFK airport?

In [19]:
# Get the top 5 destinations with the highest average departue delays experienced on flights from JFK
dep_delay = "SELECT origin,dest,ROUND(avg(depdelay),2) AS avg_dep_delay FROM flights WHERE origin='JFK' GROUP BY origin,dest ORDER BY avg_dep_delay DESC LIMIT 5"
df_dep_delay = spark.sql(dep_delay)
df_dep_delay.show()

+------+----+-------------+
|origin|dest|avg_dep_delay|
+------+----+-------------+
|   JFK| DTW|        26.73|
|   JFK| IND|        22.17|
|   JFK| PIT|         12.0|
|   JFK| IAD|        11.62|
|   JFK| ORD|        11.61|
+------+----+-------------+

