## Analyzing Airline On Time Performance using Python, Apcahe Spark and SQL

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.sql.window import Window

## Read Data Files in Spark Data Frames

In [2]:
df = spark.read.parquet('s3://data-engineering-take-home-problem-20170901/air-on-time-performance/parquet/')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df.printSchema()   

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- dayofmonth: string (nullable = true)
 |-- dayofweek: string (nullable = true)
 |-- deptime: string (nullable = true)
 |-- crsdeptime: string (nullable = true)
 |-- arrtime: string (nullable = true)
 |-- crsarrtime: string (nullable = true)
 |-- uniquecarrier: string (nullable = true)
 |-- flightnum: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- actualelapsedtime: string (nullable = true)
 |-- crselapsedtime: string (nullable = true)
 |-- airtime: string (nullable = true)
 |-- arrdelay: string (nullable = true)
 |-- depdelay: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- taxiin: string (nullable = true)
 |-- taxiout: string (nullable = true)
 |-- cancelled: string (nullable = true)
 |-- cancellationcode: string (nullable = true)
 |-- diverted: string (nullable = true)
 |-- carrierdelay:

In [4]:
df.show(500)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|year|month|dayofmonth|dayofweek|deptime|crsdeptime|arrtime|crsarrtime|uniquecarrier|flightnum|tailnum|actualelapsedtime|crselapsedtime|airtime|arrdelay|depdelay|origin|dest|distance|taxiin|taxiout|cancelled|cancellationcode|diverted|carrierdelay|weatherdelay|nasdelay|securitydelay|lateaircraftdelay|yearpart|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+--------+
|2006|   12|        23|        6|   1309|      1303|   1651|      1

In [5]:
df1 = spark.read.csv('s3://data-engineering-take-home-problem-20170901/air-on-time-performance/airports/', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df1.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+----------------+-----+-------+-----------+------------+
|iata|             airport|            city|state|country|        lat|        long|
+----+--------------------+----------------+-----+-------+-----------+------------+
| 00M|            Thigpen |     Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal|      Livingston|   TX|    USA|30.68586111|-95.01792778|
| 00V|         Meadow Lake|Colorado Springs|   CO|    USA|38.94574889|-104.5698933|
| 01G|        Perry-Warsaw|           Perry|   NY|    USA|42.74134667|-78.05208056|
| 01J|    Hilliard Airpark|        Hilliard|   FL|    USA| 30.6880125|-81.90594389|
+----+--------------------+----------------+-----+-------+-----------+------------+
only showing top 5 rows

In [7]:
df2 = spark.read.csv('s3://data-engineering-take-home-problem-20170901/air-on-time-performance/carriers/', header='True')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df2.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+
|Code|         Description|
+----+--------------------+
| 02Q|       Titan Airways|
| 04Q|  Tradewind Aviation|
| 05Q| Comlux Aviation, AG|
| 06Q|Master Top Linhas...|
| 07Q| Flair Airlines Ltd.|
+----+--------------------+
only showing top 5 rows

## Answers to the Questions

### Group A Question 1: What percentage of flights were cancelled each year from 1999 to 2003?

In [9]:
df_filtered = df.filter((df['year']  >= '1999') & (df['year'] <= '2003')).selectExpr("year", "cancelled")
df_grouped = df_filtered.groupby('year') \
                 .agg(F.count('cancelled'), F.sum('cancelled')) \
                 .withColumnRenamed('count(cancelled)','Total_Flights') \
                 .withColumnRenamed('Sum(cancelled)', 'Cancelled_Flights') \
                 .sort('year')

def dividing(a,b):
    return(float((a/b)*100))

udf_dividing = udf(dividing, DoubleType())
ans = df_grouped.withColumn('Perecentage of Flights Cancelled', udf_dividing('Cancelled_Flights','Total_Flights')) \
                .drop('Total_Flights', 'Cancelled_Flights') 

ans = ans.withColumn('Perecentage of Flights Cancelled', F.round(ans['Perecentage of Flights Cancelled'],2))                
ans.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------------------+
|year|Perecentage of Flights Cancelled|
+----+--------------------------------+
|1999|                            2.79|
|2000|                             3.3|
|2001|                            3.87|
|2002|                            1.24|
|2003|                            1.56|
+----+--------------------------------+

### Group A Question 2: On which day of the week in 2007 were you most likely to arrive on time flying from MCO to IAH? 




In [10]:
'''
For on time performance calculation I consider arrival delay less than equal to zero 
i.e. including flights that arrived before scheduled arrival timetime
'''

df_filtered = df[(df['year']=='2007') & (df['origin']=='MCO') & (df['dest']=='IAH') & (df['arrdelay'] <= '0')]

df_selected =df_filtered.selectExpr("cast(cancelled as int) as cancelled",\
                                    "dest", \
                                    "origin",\
                                    "year",\
                                    "dayofweek",\
                                    "arrdelay")

df_grouped = df_selected.groupby('dayofweek') \
                        .agg(F.count("arrdelay")) \
                        .withColumnRenamed('count(arrdelay)','On_time_flights_count') \
                        .sort(F.desc('On_time_flights_count'))

df_grouped.drop('On_time_flights_count').show(1)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|dayofweek|
+---------+
|        7|
+---------+
only showing top 1 row

### GroupA Qusetion 3: Which 10 flights (airline, flight number, origin city, destination city, date) had the latest actual vs. scheduled arrival in 2004?



In [11]:
# To my understanding this question is asking about the top 10 most delayed flights in the year 2004.

df_filtered = df[(df['year']==2004) & (df['arrdelay']!='NA')]

df_filtered = df_filtered.withColumn("date", F.to_date(F.concat_ws("-", "year", "month", "dayofmonth")) )

df_filtered = df_filtered.selectExpr('uniquecarrier','flightnum','origin','dest','date').sort(F.desc('arrdelay'))

#get airline names
df_joined = df_filtered.join(df2, df_filtered['uniquecarrier']==df2['Code'],how='left')

#join to get city names
df_ans = df_joined.join(df1, df_joined['origin']==df1['iata'],how="left") \
                  .withColumnRenamed('city','origin_city') \
                  .selectExpr(['Description','flightnum','origin_city', 'dest','date'])


ans = df_ans.join(df1, df_ans['dest']==df1['iata'],how="left") \
                .withColumnRenamed('city','destination_city') \
                .selectExpr(['Description','flightnum','origin_city', 'destination_city','date'])

ans.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-----------------+----------------+----------+
|         Description|flightnum|      origin_city|destination_city|      date|
+--------------------+---------+-----------------+----------------+----------+
|Northwest Airline...|      393|       Manchester|         Detroit|2004-07-14|
|Northwest Airline...|      610|          Spokane|     Minneapolis|2004-10-17|
|Northwest Airline...|     1195|     Grand Rapids|         Detroit|2004-06-12|
|Northwest Airline...|     1021|         New York|     Minneapolis|2004-07-23|
|ATA Airlines d/b/...|      926|          Seattle|         Chicago|2004-01-01|
|Northwest Airline...|     1674|            Omaha|     Minneapolis|2004-04-18|
|Atlantic Southeas...|     4164|Dallas-Fort Worth|         Houston|2004-11-19|
|Atlantic Southeas...|     4155|          Atlanta|     Little Rock|2004-11-30|
|Atlantic Southeas...|     4145|       Valparaiso|         Atlanta|2004-11-27|
|Delta Air Lines Inc.|     1297|        Arlington|  

### Group B Question 1: For each year from 1987 to 2008, which airline will get you from ORD to LAX the fastest (on average)? 

In [12]:
df_filtered = df[(df['year'] >=1987) & (df['year'] <=2008) & (df['dest'] == 'LAX') & (df['origin']== 'ORD')]

df_grouped1 = df_filtered.groupby('year','uniquecarrier') \
                         .agg(F.avg('actualelapsedtime'))

df_grouped2 = df_grouped1.groupby('year') \
                .agg(F.min('avg(actualelapsedtime)')) \
                .withColumnRenamed('year', 'Years') \
                .sort('Years')

df_joined = df_grouped1.join(df_grouped2, df_grouped1['avg(actualelapsedtime)']==df_grouped2['min(avg(actualelapsedtime))'],how="right")

df_ans = df_joined.join(df2, df_joined['uniquecarrier']==df2['Code'],how="left") #join to get airline names

df_ans.drop('uniquecarrier','avg(actualelapsedtime)','min(avg(actualelapsedtime))','Code','years') \
                  .sort('year') \
                  .show(22)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+
|year|         Description|
+----+--------------------+
|1987|United Air Lines ...|
|1988|American Airlines...|
|1989|American Airlines...|
|1990|American Airlines...|
|1991|American Airlines...|
|1992|American Airlines...|
|1993|American Airlines...|
|1994|United Air Lines ...|
|1995|United Air Lines ...|
|1996|United Air Lines ...|
|1997|United Air Lines ...|
|1998|United Air Lines ...|
|1999|United Air Lines ...|
|2000|United Air Lines ...|
|2001|United Air Lines ...|
|2002|United Air Lines ...|
|2003|United Air Lines ...|
|2004|United Air Lines ...|
|2005|United Air Lines ...|
|2006|United Air Lines ...|
|2007|United Air Lines ...|
|2008|American Airlines...|
+----+--------------------+

### GroupB Qustion 2: For the years 2002 to 2005, what is the ratio of carrier delay to elapsed travel time for each airline? 

In [13]:
df_filtered = df[(df['year'] >=2002) & (df['year'] <=2005)]

df_filtered = df_filtered.groupby('uniquecarrier','year').agg(F.avg('actualelapsedtime'), F.avg('carrierdelay'))

def get_ratio(a,b):
    if (a is None):
        return('NA')
    elif b==0:
        return('NA')
    else:
        return(float(a/b))
udf_div = udf(get_ratio, DoubleType())

ans = df_filtered.withColumn('Ratio', udf_div('avg(carrierdelay)','avg(actualelapsedtime)')) 
df_joined = ans.join(df2, df2['Code']==ans['uniquecarrier'],how="left") \
            .drop('avg(actualelapsedtime)','avg(carrierdelay)','uniquecarrier', 'Code') 
            
df_joined= df_joined.withColumn('Ratio', F.round(ans['Ratio'],3)).selectExpr('Description','year','Ratio').sort('Description','year')
df_joined.show(500)
'''
there are missing values for carrier delay therefore we have missing values in answer.
the avg of carrier delay of those airlines can be set to any desrired value like mean or median
an example statement how it can be done is written below.
df_filtered= df_filtered.withColumn('avg(carrierdelay)',F.when(F.col('avg(carrierdelay)').isNull(),desired_value).otherwise(F.col('avg(carrierdelay)')))
'''

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----+-----+
|         Description|year|Ratio|
+--------------------+----+-----+
|ATA Airlines d/b/...|2003|0.012|
|ATA Airlines d/b/...|2004|0.012|
|ATA Airlines d/b/...|2005|0.012|
|AirTran Airways C...|2003|0.015|
|AirTran Airways C...|2004|0.014|
|AirTran Airways C...|2005| 0.02|
|Alaska Airlines Inc.|2002| null|
|Alaska Airlines Inc.|2003|0.023|
|Alaska Airlines Inc.|2004|0.029|
|Alaska Airlines Inc.|2005|0.045|
|America West Airl...|2002| null|
|America West Airl...|2003|0.021|
|America West Airl...|2004|0.023|
|America West Airl...|2005|0.019|
|American Airlines...|2002| null|
|American Airlines...|2003|0.017|
|American Airlines...|2004|0.018|
|American Airlines...|2005|0.018|
|American Eagle Ai...|2002| null|
|American Eagle Ai...|2003|0.033|
|American Eagle Ai...|2004|0.037|
|American Eagle Ai...|2005|0.033|
|Atlantic Southeas...|2003|0.042|
|Atlantic Southeas...|2004|0.038|
|Atlantic Southeas...|2005|0.057|
|         Comair Inc.|2004|0.043|
|         Coma

### Group B Question 3: What airline spent the most and least average time taxiing (in and out) at JFK in 2006? 

In [14]:
df_filtered = df[df['year']==2006 ]
df_filtered = df_filtered[(df_filtered['origin']=='JFK') | (df_filtered['dest']=='JFK')]

#taxi time at other locations other than JFK set to zero
df_filtered= df_filtered.withColumn('taxiin', F.when(F.col('dest') != 'JFK', '0').otherwise(df_filtered['taxiin']))
df_filtered = df_filtered.withColumn('taxiout', F.when(F.col('origin') != 'JFK', '0').otherwise(df_filtered['taxiout']))

#add taxi time after grouping by carrier
df_grouped = df_filtered.groupby('uniquecarrier') \
               .agg(F.avg('taxiin')+ F.avg('taxiout')) \
               .withColumnRenamed('(avg(taxiin) + avg(taxiout))','Total_Taxi_Time') \
               .sort('Total_Taxi_Time')

df_joined  = df_grouped.join(df2, df_grouped['uniquecarrier']==df2['Code'],how='left')

df_2 = df_joined.sort(F.desc('Total_Taxi_Time')) \
                .drop('uniquecarrier','Total_Taxi_Time','Code')
df_1 = df_joined.drop('uniquecarrier','Total_Taxi_Time','Code')


"{} has the least avg. taxi time and {} has the most avg taxi time".format(df_1.head(), df_2.head())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

"Row(Description='ATA Airlines d/b/a ATA') has the least avg. taxi time and Row(Description='Expressjet Airlines Inc.') has the most avg taxi time"

### GroupC Question 1: What were the top 10 routes most likely to have a weather delay of over 10 minutes in December, 2005? Only considering routes with at least 20 flights that month. Include the origin and destination airport codes and city names in the output listing. 

In [15]:
df_filtered = df[(df['year'] ==2005) & (df['month'] =='12')]

#add aditional column with value =1 for all rows later to be used in counting
df_filtered = df_filtered.withColumn('counting', F.lit(1))

df_grouped= df_filtered.groupby('origin','dest') \
               .agg(F.avg('weatherdelay'),F.count('counting')) \
               .sort('origin').withColumnRenamed('count(counting)','counted') \
               .withColumnRenamed('avg(weatherdelay)','delay')


df_selected = df_grouped.selectExpr("cast(counted as int) >=20 as c ","cast(delay as float) >10 as d" ,"dest","origin", 'delay') \
               .sort(F.desc('delay'))

# This function is used to filter the values of dataframe that satisfied the above 2 conditions of 10 mintutes and 20 flights
def check_equal(a,b):
    if a == True & b==True:
        return(True)
    else:
        return(False)
    
udf_check = udf(check_equal, StringType())
df_checked = df_selected.withColumn('Check', udf_check('c','d'))
df_checked = df_checked.filter('Check = True')

#join to get origin city names
df_ans = df_checked.join(df1, df_checked['origin']==df1['iata'],how="left") \
                   .withColumnRenamed('city','origin_city') \
                   .selectExpr(['dest','origin','origin_city', 'delay'])

#join to get destination city names
ans = df_ans.join(df1, df_ans['dest']==df1['iata'],how="left") \
                .withColumnRenamed('city','destination_city') \
                .selectExpr(['origin','dest','origin_city', 'destination_city'])

#Filling missing value
ans= ans.withColumn('destination_city',F.when(F.col('dest')=='MQT','Marquette').otherwise(F.col('destination_city')))
ans.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----+------------+----------------+
|origin|dest| origin_city|destination_city|
+------+----+------------+----------------+
|   MSP| FCA| Minneapolis|       Kalispell|
|   GRB| MQT|   Green Bay|       Marquette|
|   LGA| SAV|    New York|        Savannah|
|   MEM| SFO|     Memphis|   San Francisco|
|   CID| DEN|Cedar Rapids|          Denver|
|   JAC| DEN|     Jackson|          Denver|
|   LGA| PWM|    New York|        Portland|
|   BTR| LGA| Baton Rouge|        New York|
|   LGA| TYS|    New York|       Knoxville|
|   BOS| JAX|      Boston|    Jacksonville|
+------+----+------------+----------------+
only showing top 10 rows

### GroupC Question 2: Flying Southwest, what is the year over year change in on-time arrival rate from 2000 to 2007? 

In [16]:
df_filtered1 = df[(df['uniquecarrier'] == 'WN') & (df['year']>=2000) & (df['year'] <=2007)]
df_filtered2 = df_filtered1[df_filtered['arrdelay'] <=0]

#Grouped data frame with total flight count
df_grouped1 = df_filtered1.groupby('year') \
                     .agg(F.count('arrdelay')) \
                     .sort('year') \
                     .withColumnRenamed('count(arrdelay)','total')

#Grouped data frame with delayed flight count
df_grouped2 = df_filtered2.groupby('year') \
                     .agg(F.count('arrdelay')) \
                     .sort('year') \
                     .withColumnRenamed('count(arrdelay)','no_delay') \
                     .withColumnRenamed('year','years')
    
df_joined = df_grouped1.join(df_grouped2, df_grouped1['year'] == df_grouped2['years'], how = "inner" )


def rate_percent(a,b):
    return(float((a/b)*100))

udf_rate = udf(rate_percent, DoubleType())
df_joined = df_joined.withColumn('Rate', udf_rate('no_delay','total'))
df_sorted = df_joined.sort('year')

#calculate %change by creating a new column

my_window = Window.partitionBy().orderBy("year")
df_sorted = df_sorted.withColumn("prev_value", F.lag(df_sorted.Rate).over(my_window))
ans = df_sorted.withColumn("% Change (On-time-arrival)", F.when(F.isnull(df_sorted.Rate - df_sorted.prev_value), 0)
                              .otherwise(df_sorted.Rate - df_sorted.prev_value))

ans = ans.drop('total','years','no_delay','Rate','prev_value')                 
ans.withColumn('% Change (On-time-arrival)', F.round(ans['% Change (On-time-arrival)'],2)).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------------+
|year|% Change (On-time-arrival)|
+----+--------------------------+
|2000|                       0.0|
|2001|                     12.35|
|2002|                      1.07|
|2003|                      6.57|
|2004|                     -9.46|
|2005|                      1.85|
|2006|                      0.02|
|2007|                     -2.11|
+----+--------------------------+

### Group C Question 3: What was the month-to-date on-time arrival rate for United for each date in September, 2005? 
 

In [17]:
df_mtd_delay = df[(df['uniquecarrier']=='UA') & (df['year']=='2005') & (df['month']=='9')]
df_mtd_nodelay = df_mtd_delay[df_mtd_delay['arrdelay'] <= 0]

df_mtd_delay_1 = df_mtd_delay.groupby('dayofmonth') \
                             .agg(F.count('arrdelay')) \
                             .sort('dayofmonth') \
                             .withColumnRenamed('count(arrdelay)','total')

df_mtd_nodelay_1 = df_mtd_nodelay.groupby('dayofmonth') \
                                 .agg(F.count('arrdelay')) \
                                 .sort('dayofmonth') \
                                 .withColumnRenamed('count(arrdelay)','no_delay') \
                                 .withColumnRenamed('dayofmonth','dayofmonths')

df_joined = df_mtd_delay_1.join(df_mtd_nodelay_1, df_mtd_delay_1['dayofmonth'] == df_mtd_nodelay_1['dayofmonths'], how = "inner" ) \
                          .sort('dayofmonth')

def rate_avg(a,b):
    return((float((a/b)*100)))

udf_rate = udf(rate_avg, DoubleType())
df_joined = df_joined.withColumn('Rate_percent', udf_rate('no_delay','total'))
df_joined = df_joined.selectExpr("cast(dayofmonth as int) as day","Rate_percent") \
                     .sort('day')


my_windows = Window.partitionBy().orderBy("day")
cumSum = df_joined.withColumn("cumulativeSum", F.sum(df_joined.Rate_percent).over(my_windows))

def get_avgg(x,y):
    return(float((x/y)))

udf_div = udf(get_avgg, DoubleType())
ans = cumSum.withColumn('MTD_on_time_arrival_rate', udf_div('CumulativeSum','day'))
ans = ans.drop('cumulativeSum','Rate_percent')
ans.withColumn('MTD_on_time_arrival_rate', F.round(ans['MTD_on_time_arrival_rate'],2)).show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------------------+
|day|MTD_on_time_arrival_rate|
+---+------------------------+
|  1|                    71.2|
|  2|                   69.69|
|  3|                    72.2|
|  4|                   74.11|
|  5|                    73.4|
|  6|                   72.58|
|  7|                   72.83|
|  8|                   71.43|
|  9|                   70.52|
| 10|                   70.73|
| 11|                   70.68|
| 12|                   70.22|
| 13|                   69.28|
| 14|                   68.41|
| 15|                    67.5|
| 16|                   66.69|
| 17|                   66.75|
| 18|                    66.5|
| 19|                   64.98|
| 20|                   64.94|
| 21|                   64.98|
| 22|                   63.64|
| 23|                   62.96|
| 24|                   62.85|
| 25|                   62.28|
| 26|                   62.33|
| 27|                   62.49|
| 28|                   62.34|
| 29|                   62.28|
| 30|   

## Further Analysis

## Two questions:

#### 1. Name the city and the month where it is most likely for a flight to be delayed due to weather?
#### 2. Top 5 flight hours that have the highest average delays?
