## Flights Data Set


> Mallios Charalampos, Student - p2821912 <br />
> Department of Management Science and Technology <br />
> Msc Business Analytics <br />
> Athens University of Economics and Business <br />
> p2821912@aueb.gr

# Question 1


## Task 1
Provide a "misery index" for airports. That is, sort the airports in descending order of the probability that a flight departing from that airport has a delay. Take care of outliers: some airports may have a preposterously low number of flights. We are not interested in them. Your criterion for outliers will be the airports in the lowest 1% percentile in the number of flights.

### Answer

* First we have to initialize connection with Spark and create an APP named "Spark assignement".
* The next step is to read the corrensponding csv as Spark dataframe.
* Compute the number of flights for each ORIGIN and store into a dataframe. 
* Compute the outliers, every observation <= 58 will be dropped out.
* Find the number of delayed flights(those have departure_time >0) and stored into a dataframe.
* Join together the previous dataframes ( all flights - delayed flights) into a new dataframe.
* Compute measury index as new column into the joined dataframe created above(all flights / delayed flights).


In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

#we create Spark session
spark =  SparkSession.builder.appName("Spark assignement").getOrCreate()

In [2]:
spark # check if is it initialized

In [3]:
from pyspark.sql.types import *
#read the corresponding csv input file along with the headers

flights = spark\
  .read\
  .option("header", "true")\
  .csv("515364771_T_ONTIME_REPORTING.csv")

In [4]:
# group flights by ORIGIN  and rename count aggregation into number of flights
grouped_flights = flights.groupby('ORIGIN').count().withColumnRenamed("count","Number of flights")
#then the created dataframe sort it ascending
grouped_flights.orderBy('Number of flights', ascending=True).show()

+------+-----------------+
|ORIGIN|Number of flights|
+------+-----------------+
|   YNG|                2|
|   ART|               25|
|   IFP|               45|
|   CYS|               58|
|   AKN|               63|
|   BKG|               71|
|   GST|               84|
|   DLG|               84|
|   HYA|               89|
|   ADK|              104|
|   OWB|              111|
|   DRT|              114|
|   PPG|              122|
|   OGD|              126|
|   OGS|              137|
|   HGR|              138|
|   STC|              145|
|   MMH|              149|
|   FLO|              170|
|   SMX|              170|
+------+-----------------+
only showing top 20 rows



In [6]:
#define the threshold. 1% quantile and store it to var ourliers.
outliers = grouped_flights.approxQuantile("Number of flights", [0.01], 0)
outliers

[58.0]

In [7]:
#drop the ORIGIN who had <= 58 flights
flights = flights.filter("`ORIGIN` != 'YNG'")
flights = flights.filter("`ORIGIN` != 'ART'")
flights = flights.filter("`ORIGIN` != 'IFP'")
flights = flights.filter("`ORIGIN` != 'CYS'")

# group flights by ORIGIN and rename count aggregation into number of flights
grouped_flights = flights.groupby('ORIGIN').count().withColumnRenamed("count","Number of flights")
#then the created dataframe sort it ascending
grouped_flights.orderBy('Number of flights', ascending=True).show()


+------+-----------------+
|ORIGIN|Number of flights|
+------+-----------------+
|   AKN|               63|
|   BKG|               71|
|   GST|               84|
|   DLG|               84|
|   HYA|               89|
|   ADK|              104|
|   OWB|              111|
|   DRT|              114|
|   PPG|              122|
|   OGD|              126|
|   OGS|              137|
|   HGR|              138|
|   STC|              145|
|   MMH|              149|
|   FLO|              170|
|   SMX|              170|
|   EAR|              200|
|   GUC|              230|
|   PRC|              236|
|   WYS|              239|
+------+-----------------+
only showing top 20 rows



In [8]:
# those who had delay have DEP_DELAY >0. So with filter subset only them and keep it to another df named flights in ascending sort
flights = flights.filter("`DEP_DELAY` > 0").orderBy('DEP_DELAY', ascending=True)
#enumerate delayed flights by Origin
delayed_df = flights.groupby('ORIGIN').count().withColumnRenamed("count","Number of delayed flights")
delayed_df.show()

+------+-------------------------+
|ORIGIN|Number of delayed flights|
+------+-------------------------+
|   BGM|                      206|
|   INL|                      111|
|   PSE|                      256|
|   DLG|                       28|
|   MSY|                    20949|
|   PPG|                       58|
|   GEG|                     3226|
|   DRT|                       30|
|   BUR|                    10950|
|   SNA|                    14002|
|   GRB|                     1038|
|   GTF|                      334|
|   IDA|                      485|
|   GRR|                     5152|
|   LWB|                      186|
|   JLN|                      211|
|   PVU|                      202|
|   PSG|                      125|
|   EUG|                     1284|
|   PVD|                     5614|
+------+-------------------------+
only showing top 20 rows



In [9]:
# join the previous dataframes all flights(grouped_flights) and delayed flights (delayed_df) into one named all_misery
all_misery = grouped_flights.join(delayed_df, "ORIGIN")
all_misery.show()

+------+-----------------+-------------------------+
|ORIGIN|Number of flights|Number of delayed flights|
+------+-----------------+-------------------------+
|   BGM|              920|                      206|
|   DLG|               84|                       28|
|   INL|              650|                      111|
|   PSE|              792|                      256|
|   MSY|            55883|                    20949|
|   PPG|              122|                       58|
|   DRT|              114|                       30|
|   GEG|            12348|                     3226|
|   BUR|            26583|                    10950|
|   SNA|            41639|                    14002|
|   GRB|             4846|                     1038|
|   GTF|             1856|                      334|
|   IDA|             2167|                      485|
|   GRR|            17340|                     5152|
|   LWB|              576|                      186|
|   JLN|              805|                    

In [12]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import format_number,round

# create a new column with the misery index->  no_flights_delayed/ no_flights . Moreover cut some demical places for better visualization
all_misery = all_misery.withColumn("misery_index", lit(all_misery['Number of delayed flights']/all_misery['Number of flights']*100))
#cut some demicals and keep only 2
all_misery = all_misery.withColumn("misery_index",round(all_misery['misery_index'],2))
# order misery index in descending
all_misery.orderBy('misery_index', ascending=False).show()

+------+-----------------+-------------------------+------------+
|ORIGIN|Number of flights|Number of delayed flights|misery_index|
+------+-----------------+-------------------------+------------+
|   OGD|              126|                       83|       65.87|
|   SCK|              747|                      486|       65.06|
|   MDW|            87095|                    51131|       58.71|
|   HYA|               89|                       52|       58.43|
|   DAL|            70995|                    40544|       57.11|
|   HOU|            57841|                    31438|       54.35|
|   OWB|              111|                       56|       50.45|
|   HGR|              138|                       68|       49.28|
|   HTS|              879|                      423|       48.12|
|   PPG|              122|                       58|       47.54|
|   BQN|             1960|                      908|       46.33|
|   TTN|             2603|                     1191|       45.75|
|   MVY|  

## Task 2

After you have done that, go around your data again, but this time you will show the average and median delay for each airport. You may sort the results by either.


### Answer

* First we have to compute the median for each Origin and get them stored into a dataframe.
* The next step is to compute the mean for each Origin and get them stored into another dataframe.
* Finally join the above dataframes together and sort by median.

In [13]:
from pyspark.sql import Window
import pyspark.sql.functions as F

#window function that partition by ORIGIN
delay_window = Window.partitionBy('ORIGIN')
#store median as quantile (50%)
median = F.expr('percentile_approx(DEP_DELAY, 0.5)')

#group by Origin and find median - then rename column as Median of Departure Delay
median_delay = flights.groupBy('ORIGIN').agg(median.alias('Median of Departure Delay'))
# sort the dataframe in descending 
median_delay.orderBy('Median of Departure Delay', ascending=False).show()

+------+-------------------------+
|ORIGIN|Median of Departure Delay|
+------+-------------------------+
|   RDD|                     47.0|
|   SAF|                     43.0|
|   DVL|                     43.0|
|   OTH|                     43.0|
|   ERI|                     41.0|
|   HVN|                     41.0|
|   MMH|                     38.0|
|   MKG|                     38.0|
|   HYS|                     37.0|
|   CMX|                     37.0|
|   CKB|                     36.0|
|   ASE|                     36.0|
|   PAH|                     36.0|
|   LWB|                     35.0|
|   MEI|                     35.0|
|   ORH|                     34.0|
|   BKG|                     34.0|
|   MBS|                     33.0|
|   SCE|                     33.0|
|   ACV|                     33.0|
+------+-------------------------+
only showing top 20 rows



In [14]:

# group by Origin and compute the median
meand = flights.groupby(['ORIGIN']).agg({'DEP_DELAY':'mean'})
# rename the column appropriatelly
mean_delay = meand.withColumnRenamed("avg(DEP_DELAY)", "Average Departure Delay")
# cut some demical we dont need
mean_delay = mean_delay.withColumn("Average Departure Delay",round(mean_delay['Average Departure Delay'],2))
# sort them in descendign
mean_delay.orderBy('Average Departure Delay', ascending=False).show()



+------+-----------------------+
|ORIGIN|Average Departure Delay|
+------+-----------------------+
|   DVL|                 138.42|
|   PPG|                 106.45|
|   JMS|                 104.86|
|   CMX|                  99.82|
|   RHI|                  96.75|
|   LWB|                  95.56|
|   SLN|                  94.35|
|   SHD|                  89.89|
|   CKB|                  88.91|
|   ITH|                   88.4|
|   SAF|                  86.85|
|   EGE|                  85.35|
|   EAU|                  85.29|
|   HYS|                  84.97|
|   LBL|                  84.57|
|   ERI|                  84.28|
|   LAR|                  83.41|
|   PUB|                   83.1|
|   PIB|                  82.32|
|   MMH|                  81.98|
+------+-----------------------+
only showing top 20 rows



In [15]:
from pyspark.sql.functions import *
#join together the above dataframes into a dataframe named airport_statistics
airport_statistics = median_delay.join(mean_delay, "ORIGIN")

#order by descending median
airport_statistics.orderBy("Median of Departure Delay",ascending=False).show()




+------+-------------------------+-----------------------+
|ORIGIN|Median of Departure Delay|Average Departure Delay|
+------+-------------------------+-----------------------+
|   RDD|                     47.0|                  76.08|
|   OTH|                     43.0|                  78.13|
|   DVL|                     43.0|                 138.42|
|   SAF|                     43.0|                  86.85|
|   HVN|                     41.0|                   65.4|
|   ERI|                     41.0|                  84.28|
|   MKG|                     38.0|                  67.45|
|   MMH|                     38.0|                  81.98|
|   CMX|                     37.0|                  99.82|
|   HYS|                     37.0|                  84.97|
|   ASE|                     36.0|                  74.03|
|   CKB|                     36.0|                  88.91|
|   PAH|                     36.0|                  68.91|
|   LWB|                     35.0|                  95.5

## Task 3

Finally, enrich your airport misery index by tabulating both the probability, in descending order, that you will experience a delay at a given airport, and the average and median delay that you are likely to experience. 

### Answer

* Finally join together all the dataframes into one named 'airport_statistics' which will have both the statistics and the misery index for each Airport in descending sort.

In [16]:
#join misery_index dataframe and airport_statistics dataframe we have them all together.
airport_delay_all = all_misery.join(airport_statistics, "ORIGIN").orderBy("misery_index",ascending=False).show()

+------+-----------------+-------------------------+------------+-------------------------+-----------------------+
|ORIGIN|Number of flights|Number of delayed flights|misery_index|Median of Departure Delay|Average Departure Delay|
+------+-----------------+-------------------------+------------+-------------------------+-----------------------+
|   OGD|              126|                       83|       65.87|                     14.0|                   30.9|
|   SCK|              747|                      486|       65.06|                     21.0|                  38.91|
|   MDW|            87095|                    51131|       58.71|                     14.0|                  26.62|
|   HYA|               89|                       52|       58.43|                     15.0|                  61.27|
|   DAL|            70995|                    40544|       57.11|                     13.0|                  26.84|
|   HOU|            57841|                    31438|       54.35|       

## Question 2

## Task 1

Provide a "misery index" for airlines. That is, do the same thing you did for the airports, but this time we are interested in the airlines that make life difficult for passengers. Sort the airlines in descending order of probability that a flight operated by the airline has a delay. This time we do not care about outliers.

### Answer 

* Get the data again for the flights and the data for the airlines where we will get the description for the airline.
* Now we don't care about outlier so we keep all the values.
* Find the number of flights by Carrier and stored into a dataframe.
* Find the number of delayed flights(those have departure_time >0) and stored into a dataframe by Airline (Carrier).
* Join together the previous dataframes ( all flights - delayed flights) into a new dataframe.
* Compute measury index as new column into the joined dataframe created above(all flights / delayed flights)

In [17]:
from pyspark.sql.types import *

# read flights file again 
flights = spark\
  .read\
  .option("header", "true")\
  .csv("515364771_T_ONTIME_REPORTING.csv")

In [18]:
from pyspark.sql.types import *

#read the file with airlines details for future use at the end.
airlines = spark\
  .read\
  .option("header", "true")\
  .csv("Download_Lookup.asp")

In [19]:
#group carrier by number of flights
carrier = flights.groupby('CARRIER').count().withColumnRenamed("count","Number of flights")
carrier.show()

+-------+-----------------+
|CARRIER|Number of flights|
+-------+-----------------+
|     UA|           621565|
|     NK|           176178|
|     AA|           916818|
|     EV|           202890|
|     B6|           305010|
|     DL|           949283|
|     OO|           774137|
|     F9|           120035|
|     YV|           215138|
|     MQ|           296001|
|     OH|           278457|
|     HA|            83723|
|     G4|            96221|
|     YX|           316090|
|     AS|           245761|
|     VX|            17670|
|     WN|          1352552|
|     9E|           245917|
+-------+-----------------+



In [20]:
# filter those flights have delay
delayed_df_car = flights.filter("`DEP_DELAY` > 0")
# for those we have keeped , group by Carrier name and count them in a new column named no_flights_delayed
delayed_carrier = delayed_df_car.groupby('CARRIER').count().withColumnRenamed("count","Number of delayed flights")
delayed_carrier.show()

+-------+-------------------------+
|CARRIER|Number of delayed flights|
+-------+-------------------------+
|     UA|                   185249|
|     NK|                    51119|
|     AA|                   317751|
|     EV|                    53898|
|     B6|                   125011|
|     DL|                   286165|
|     OO|                   203193|
|     F9|                    53321|
|     YV|                    60284|
|     MQ|                    83546|
|     OH|                   100247|
|     HA|                    21767|
|     G4|                    32886|
|     YX|                    75839|
|     AS|                    66254|
|     VX|                     5382|
|     WN|                   662363|
|     9E|                    64942|
+-------+-------------------------+



In [21]:
# join the above dataframes into a new named all_carriers
all_carriers = carrier.join(delayed_carrier, "CARRIER")


In [23]:

#compute in a new column (misery_index) the ration flights delayed/ all flights
all_carriers_misery = all_carriers.withColumn("misery_index", lit((all_carriers['Number of delayed flights']/all_carriers['Number of flights'])*100))
# cut some demical we dont need
all_carriers_misery = all_carriers_misery.withColumn("misery_index",round(all_carriers_misery['misery_index'],2))
# sort them descending
all_carriers_misery.orderBy('misery_index', ascending=False).show()


+-------+-----------------+-------------------------+------------+
|CARRIER|Number of flights|Number of delayed flights|misery_index|
+-------+-----------------+-------------------------+------------+
|     WN|          1352552|                   662363|       48.97|
|     F9|           120035|                    53321|       44.42|
|     B6|           305010|                   125011|       40.99|
|     OH|           278457|                   100247|        36.0|
|     AA|           916818|                   317751|       34.66|
|     G4|            96221|                    32886|       34.18|
|     VX|            17670|                     5382|       30.46|
|     DL|           949283|                   286165|       30.15|
|     UA|           621565|                   185249|        29.8|
|     NK|           176178|                    51119|       29.02|
|     MQ|           296001|                    83546|       28.22|
|     YV|           215138|                    60284|       28

## Task 2

Then, go around your data again, but this time the criterion will be the average and median delay you may expect to have with an airline. Again we do not care about outliers.


### Answer

* First we have to compute the median for each Carrier and get them stored into a dataframe.
* The next step is to compute the mean for each Carrier and get them stored into another dataframe.
* Finally join the above dataframes together and sort by median.


In [24]:
from pyspark.sql import Window
import pyspark.sql.functions as F

#window function that partition by CARRIER
delay_window = Window.partitionBy('CARRIER')
#store median as quantile (50%)
median = F.expr('percentile_approx(DEP_DELAY, 0.5)')
#group by Carrier and find median
median_delay = delayed_df_car.groupBy('CARRIER').agg(median.alias('Median of Departure Delay'))
#then sort them by Descending
median_delay.orderBy('Median of Departure Delay', ascending=False).show()

+-------+-------------------------+
|CARRIER|Median of Departure Delay|
+-------+-------------------------+
|     EV|                     31.0|
|     F9|                     25.0|
|     YX|                     25.0|
|     9E|                     25.0|
|     B6|                     24.0|
|     OO|                     24.0|
|     YV|                     23.0|
|     G4|                     22.0|
|     UA|                     20.0|
|     MQ|                     20.0|
|     NK|                     19.0|
|     VX|                     18.0|
|     OH|                     17.0|
|     AA|                     16.0|
|     AS|                     14.0|
|     DL|                     13.0|
|     WN|                     12.0|
|     HA|                      7.0|
+-------+-------------------------+



In [25]:
#find the mean for each Carrier 
meand = delayed_df_car.groupby(['CARRIER']).agg({'DEP_DELAY':'mean'})
#the column contains the name rename it to Average Departure Delay
mean_delay = meand.withColumnRenamed("avg(DEP_DELAY)", "Average Departure Delay")
# cut some demicals we don't need
mean_delay = mean_delay.withColumn("Average Departure Delay",round(mean_delay['Average Departure Delay'],2))
#sort them in descending
mean_delay.orderBy('avg(DEP_DELAY)', ascending=False).show()


+-------+-----------------------+
|CARRIER|Average Departure Delay|
+-------+-----------------------+
|     EV|                  61.87|
|     OO|                  54.69|
|     9E|                  53.95|
|     YV|                  51.56|
|     F9|                  50.86|
|     G4|                  49.59|
|     B6|                  47.01|
|     YX|                  46.78|
|     NK|                  46.66|
|     UA|                  45.34|
|     OH|                  40.11|
|     MQ|                  39.54|
|     AA|                  37.12|
|     VX|                  34.93|
|     DL|                  33.57|
|     AS|                  28.81|
|     WN|                  25.38|
|     HA|                  19.22|
+-------+-----------------------+



In [26]:
#store them all together in a dataframe named airline_statistics
airline_statistics = median_delay.join(mean_delay, "CARRIER")
#and sort them in descending by median
airline_statistics.orderBy("Median of Departure Delay",ascending=False).show()

+-------+-------------------------+-----------------------+
|CARRIER|Median of Departure Delay|Average Departure Delay|
+-------+-------------------------+-----------------------+
|     EV|                     31.0|                  61.87|
|     F9|                     25.0|                  50.86|
|     YX|                     25.0|                  46.78|
|     9E|                     25.0|                  53.95|
|     B6|                     24.0|                  47.01|
|     OO|                     24.0|                  54.69|
|     YV|                     23.0|                  51.56|
|     G4|                     22.0|                  49.59|
|     UA|                     20.0|                  45.34|
|     MQ|                     20.0|                  39.54|
|     NK|                     19.0|                  46.66|
|     VX|                     18.0|                  34.93|
|     OH|                     17.0|                  40.11|
|     AA|                     16.0|     

## Task 3

And finally, as you would expect,  enrich your airline misery index by tabulating both the probability, in descending order, that you will experience a delay flying with a particular airline, and the average and median delay that you are likely to experience.


### Answer

* First we have to join the dataframe with the misery index along with the one with the statistics into one named 'airline_delay_all'.
* The next step is to join the additional dataset containing airline details.
* To do so, we had to rename the column code into 'Carrier' to match column name from our dataset.
* We also rename description to Airline name , for better visualization of info.
* Joining two dataframes together we keep only Airline Name, misery_index , Median of Departure Delay, Average Departure Delay and we sort them by misery_index (probabilty of delay for each Airline).



In [27]:
#join statistics and misery_index dataframes into one
airline_delay_all = all_carriers_misery.join(airline_statistics, "CARRIER")

In [28]:
airline_delay_all.show()

+-------+-----------------+-------------------------+------------+-------------------------+-----------------------+
|CARRIER|Number of flights|Number of delayed flights|misery_index|Median of Departure Delay|Average Departure Delay|
+-------+-----------------+-------------------------+------------+-------------------------+-----------------------+
|     UA|           621565|                   185249|        29.8|                     20.0|                  45.34|
|     NK|           176178|                    51119|       29.02|                     19.0|                  46.66|
|     AA|           916818|                   317751|       34.66|                     16.0|                  37.12|
|     EV|           202890|                    53898|       26.57|                     31.0|                  61.87|
|     B6|           305010|                   125011|       40.99|                     24.0|                  47.01|
|     DL|           949283|                   286165|       30.1

In [29]:
# renane column code to carrier to fit naming for the join with the additional dataframe
airlines_renamed = airlines.withColumnRenamed("Code", "CARRIER")
#rename column description for better visualization
airlines_renamed = airlines_renamed.withColumnRenamed("Description", "Airline Name")

In [52]:
airlines_renamed.show()

+-------+--------------------+
|CARRIER|        Airline Name|
+-------+--------------------+
|    02Q|       Titan Airways|
|    04Q|  Tradewind Aviation|
|    05Q| Comlux Aviation, AG|
|    06Q|Master Top Linhas...|
|    07Q| Flair Airlines Ltd.|
|    09Q|Swift Air, LLC d/...|
|    0BQ|                 DCA|
|    0CQ|ACM AIR CHARTER GmbH|
|    0FQ|Maine Aviation Ai...|
|    0GQ|Inter Island Airw...|
|    0HQ|Polar Airlines de...|
|     0J|          JetClub AG|
|    0JQ|     Vision Airlines|
|    0LQ|   Metropix UK, LLP.|
|    0OQ|          Open Skies|
|     0Q| Flying Service N.V.|
|    0QQ|TAG Aviation (UK)...|
|    0RQ|TAG Aviation Espa...|
|    0TQ|  Corporatejets, XXI|
|    0UQ|  Comlux Malta, Ltd.|
+-------+--------------------+
only showing top 20 rows



In [30]:
#join on common column CARRIER
airlines_delay_all_info = airline_delay_all.join(airlines_renamed, "CARRIER")
#drop Carrier , now we have airline name which is more clear
airlines_delay_all_info = airlines_delay_all_info.drop('CARRIER')
#select only these columns we need
airlines_delay_all_info = airlines_delay_all_info.select("Airline Name","misery_index","Median of Departure Delay","Average Departure Delay")
#sort by probability (misery_index) in descending 
airlines_delay_all_info.orderBy("misery_index",ascending=False).show()

+--------------------+------------+-------------------------+-----------------------+
|        Airline Name|misery_index|Median of Departure Delay|Average Departure Delay|
+--------------------+------------+-------------------------+-----------------------+
|Southwest Airline...|       48.97|                     12.0|                  25.38|
|Frontier Airlines...|       44.42|                     25.0|                  50.86|
|     JetBlue Airways|       40.99|                     24.0|                  47.01|
|   PSA Airlines Inc.|        36.0|                     17.0|                  40.11|
|American Airlines...|       34.66|                     16.0|                  37.12|
|       Allegiant Air|       34.18|                     22.0|                  49.59|
|      Virgin America|       30.46|                     18.0|                  34.93|
|Delta Air Lines Inc.|       30.15|                     13.0|                  33.57|
|United Air Lines ...|        29.8|                   