## Exploratory Data Analysis for 2009 - 2015 Flight Delays and Cancellations

####  Data Profiling 

##### Import required libraries

In [0]:
sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1587675686828_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5 MB)
Collecting python-dateutil>=2.6.1
  Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.1

Collecting matplotlib
  Using cached matplotlib-3.2.1-cp36-cp36m-manylinux1_x86_64.whl (12.4 MB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Using cached pyparsing-2.4.7-py2.py3-none-any.whl (67 kB)
Collecting kiwisolver>=1.0.1
  Using cached kiwisolver-1.2.0-cp36-cp36m-manylinux1_x86_64.whl (88 kB)
Collecting cycler>=0.10
  Using cached cycler-0.10.0-py2.py3-none-any.whl (6.5 kB)
Installing collected packages: pyparsing, kiwisolver, cycler, matplotlib
Successfully installed cycler-0.10.0 kiwisolver-1.2.0 matplotlib-3.2.1 pyparsing-2.4.7

In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F 
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Create Schema

In [0]:
flightSchema = StructType([
    StructField('fl_date', DateType(), False),
    StructField('op_carrier', StringType(), False),
    StructField('op_carrier_fl_num', IntegerType(), False),
    StructField('origin', StringType(), False), 
    StructField('dest', StringType(), False),
    StructField('crs_dep_time', IntegerType(), False),
    StructField('dep_time', DoubleType(), False),
    StructField('dep_delay', DecimalType(), False),
    StructField('taxi_out', DoubleType(), False),
    StructField('wheels_off', DoubleType(), False),
    StructField('wheels_on', DoubleType(), False),
    StructField('taxi_in', DoubleType(), False),
    StructField('crs_arr_time', IntegerType(), False),
    StructField('arr_time', DoubleType(), False),
    StructField('arr_delay', DecimalType(), False),
    StructField('cancelled', DecimalType(), False),
    StructField('cancellation_code', StringType(), False),
    StructField('csr_elapsed_time', DecimalType(), False),
    StructField('actual_elapsed_time', DecimalType(), False)
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Structure in which data can be read

In [0]:
flight_df = spark.read.format('csv').load('s3://flightanalysis/*.csv', schema=flightSchema) 
flight_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- fl_date: date (nullable = true)
 |-- op_carrier: string (nullable = true)
 |-- op_carrier_fl_num: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- crs_dep_time: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- dep_delay: decimal(10,0) (nullable = true)
 |-- taxi_out: double (nullable = true)
 |-- wheels_off: double (nullable = true)
 |-- wheels_on: double (nullable = true)
 |-- taxi_in: double (nullable = true)
 |-- crs_arr_time: integer (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- arr_delay: decimal(10,0) (nullable = true)
 |-- cancelled: decimal(10,0) (nullable = true)
 |-- cancellation_code: string (nullable = true)
 |-- csr_elapsed_time: decimal(10,0) (nullable = true)
 |-- actual_elapsed_time: decimal(10,0) (nullable = true)

##### Chunking Data

In [0]:
flight_df.count() #Number of rows in total 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

61556974

Our null analysis shows that the following features have a lot of null values: We will delete these rows, and omit some unnecessary columns as well. 

In [0]:
for col in flight_df.columns:
    print(str(col), "\t", "with null values: ", flight_df.filter(flight_df[col].isNull()).count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

fl_date 	 with null values:  10
op_carrier 	 with null values:  0
op_carrier_fl_num 	 with null values:  10
origin 	 with null values:  0
dest 	 with null values:  0
crs_dep_time 	 with null values:  6096772
dep_time 	 with null values:  935733
dep_delay 	 with null values:  940685
taxi_out 	 with null values:  963911
wheels_off 	 with null values:  963906
wheels_on 	 with null values:  997026
taxi_in 	 with null values:  997025
crs_arr_time 	 with null values:  6096772
arr_time 	 with null values:  997025
arr_delay 	 with null values:  1121361
cancelled 	 with null values:  10
cancellation_code 	 with null values:  60583755
csr_elapsed_time 	 with null values:  10
actual_elapsed_time 	 with null values:  70

In [0]:
flight_df = flight_df.na.drop()
flight_df.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+
|fl_date|op_carrier|op_carrier_fl_num|origin|dest|crs_dep_time|dep_time|dep_delay|taxi_out|wheels_off|wheels_on|taxi_in|crs_arr_time|arr_time|arr_delay|cancelled|cancellation_code|csr_elapsed_time|actual_elapsed_time|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+

We will be analyzing the data per year, hence we will keep only the year data from the date column

In [0]:
from pyspark.sql.functions import year 
from pyspark.sql.functions import to_date
flight_df = flight_df.withColumn('year', year(flight_df.fl_date)).na.drop() #adding a columns 
flight_df = flight_df.drop('fl_date')
flight_df.createOrReplaceTempView("flight_data") #create temporary table

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+----+
|op_carrier|op_carrier_fl_num|origin|dest|crs_dep_time|dep_time|dep_delay|taxi_out|wheels_off|wheels_on|taxi_in|crs_arr_time|arr_time|arr_delay|cancelled|cancellation_code|csr_elapsed_time|actual_elapsed_time|year|
+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+----+
+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+----------------+-------------------+----+

We will chunk up the dataframe into 3 major parts to facilitate with data wrangling: 
 - Contains flight basic information
 - Contains flight delays information 
 - Contains flight cancellation information 

In [0]:
#Basic flight profile 
profile_df = spark.sql('''select year as date, op_carrier_fl_num as flight_number, op_carrier as flight_identifier, origin as origin, dest as destination from flight_data''')
profile_df.createOrReplaceTempView("basics")
profile_df.show(2)
profile_df.coalesce(1).write.option("delimiter", ",").option("header", "true").mode("overwrite").csv('s3://flightanalysis/cleaned-dataset/flight-profile')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------+-----------------+------+-----------+
|date|flight_number|flight_identifier|origin|destination|
+----+-------------+-----------------+------+-----------+
+----+-------------+-----------------+------+-----------+

In [0]:
#flight delay details per airline 
delays_df = spark.sql('''select year as date, op_carrier as flight_identifier, crs_dep_time as plan_time, dep_time as actual_time, dep_delay as dep_delay from flight_data''')
delays_df.createOrReplaceTempView("delays")
delays_df.show(2)
delays_df.coalesce(1).write.option("delimiter", ",").option("header", "true").mode("overwrite").csv('s3://flightanalysis/cleaned-dataset/flight-delays')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------------+---------+-----------+---------+
|date|flight_identifier|plan_time|actual_time|dep_delay|
+----+-----------------+---------+-----------+---------+
+----+-----------------+---------+-----------+---------+

In [0]:
#flight cancellation details per airline 
cancellation_df = spark.sql('''select year as date, op_carrier_fl_num as flight_number, op_carrier as flight_identifier, cancelled as cancel, cancellation_code as reason from flight_data''')
cancellation_df.createOrReplaceTempView("cancellations")
cancellation_df.show(2)
cancellation_df.coalesce(1).write.option("delimiter", ",").option("header", "true").mode("overwrite").csv('s3://flightanalysis/cleaned-dataset/flight-cancellation')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------+-----------------+------+------+
|date|flight_number|flight_identifier|cancel|reason|
+----+-------------+-----------------+------+------+
+----+-------------+-----------------+------+------+

#### Data Wrangling 

##### Queries on dataframe - Profile

In [0]:
#Find the total number of flight operated by a specific airline 
airline_frequency = spark.sql('''select flight_identifier, COUNT(flight_identifier) identifier_count from basics group by flight_identifier order by identifier_count DESC''')
#show the 10 most airlines with flight operations 
airline_frequency.take(10)
airline_frequency.createOrReplaceTempView("Airline_Frequency")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…