# Group Assigment: C

In [1]:
import findspark
findspark.init()

In [2]:
findspark.find()
import pyspark
findspark.find()

'/opt/spark-2.4.4-bin-hadoop2.7'

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName('appName').setMaster('local[4]')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window, WindowSpec

## Introduction to the Flights dataset

According to a 2010 report made by the US Federal Aviation Administration, the economic price of domestic flight delays entails a yearly cost of 32.9 billion dollars to passengers, airlines and other parts of the economy. More than half of that amount comes from passengers' pockets, as they do not only waste time waiting for their planes to leave, but also miss connecting flights, spend money on food and have to sleep on hotel rooms while they're stranded.

The report, focusing on data from year 2007, estimated that air transportation delays put a 4 billion dollar dent in the country's gross domestic product that year. Full report can be found 
<a href="http://www.isr.umd.edu/NEXTOR/pubs/TDI_Report_Final_10_18_10_V3.pdf">here</a>.

But which are the causes for these delays?

In order to answer this question, we are going to analyze the provided dataset, containing up to 1.936.758 different internal flights in the US for 2008 and their causes for delay, diversion and cancellation; if any.

The data comes from the U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS)

This dataset is composed by the following variables:
1. **Year** 2008
2. **Month** 1
3. **DayofMonth** 1-31
4. **DayOfWeek** 1 (Monday) - 7 (Sunday)
5. **DepTime** actual departure time (local, hhmm)
6. **CRSDepTime** scheduled departure time (local, hhmm)
7. **ArrTime** actual arrival time (local, hhmm)
8. **CRSArrTime** scheduled arrival time (local, hhmm)
9. **UniqueCarrie**r unique carrier code
10. **FlightNum** flight number
11. **TailNum** plane tail number: aircraft registration, unique aircraft identifier
12. **ActualElapsedTime** in minutes
13. **CRSElapsedTime** in minutes
14. **AirTime** in minutes
15. **ArrDelay** arrival delay, in minutes: A flight is counted as "on time" if it operated less than 15 minutes later the scheduled time shown in the carriers' Computerized Reservations Systems (CRS).
16. **DepDelay** departure delay, in minutes
17. **Origin** origin IATA airport code
18. **Dest** destination IATA airport code
19. **Distance** in miles
20. **TaxiIn** taxi in time, in minutes
21. **TaxiOut** taxi out time in minutes
22. **Cancelled** *was the flight cancelled
23. **CancellationCode** reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
24. **Diverted** 1 = yes, 0 = no
25. **CarrierDelay** in minutes: Carrier delay is within the control of the air carrier. Examples of occurrences that may determine carrier delay are: aircraft cleaning, aircraft damage, awaiting the arrival of connecting passengers or crew, baggage, bird strike, cargo loading, catering, computer, outage-carrier equipment, crew legality (pilot or attendant rest), damage by hazardous goods, engineering inspection, fueling, handling disabled passengers, late crew, lavatory servicing, maintenance, oversales, potable water servicing, removal of unruly passenger, slow boarding or seating, stowing carry-on baggage, weight and balance delays.
26. **WeatherDelay** in minutes: Weather delay is caused by extreme or hazardous weather conditions that are forecasted or manifest themselves on point of departure, enroute, or on point of arrival.
27. **NASDelay** in minutes: Delay that is within the control of the National Airspace System (NAS) may include: non-extreme weather conditions, airport operations, heavy traffic volume, air traffic control, etc.
28. **SecurityDelay** in minutes: Security delay is caused by evacuation of a terminal or concourse, re-boarding of aircraft because of security breach, inoperative screening equipment and/or long lines in excess of 29 minutes at screening areas.
29. **LateAircraftDelay** in minutes: Arrival delay at an airport due to the late arrival of the same aircraft at a previous airport. The ripple effect of an earlier delay at downstream airports is referred to as delay propagation

Read the CSV file using Spark's default delimiter (","). The first line contains the headers so it is not part of the data. Hence we set the header option to true.

In [5]:
flightsDF = spark.read.option("inferSchema", "true").option("header", "true").csv("flights_jan08.csv")
airportsDF = spark.read.option("inferSchema", "true").option("header", "true").csv("Airports.csv")
flightsDF.cache()
airportsDF.cache()

DataFrame[IATA_CODE: string, LINK: string, LOCATION: string, TYPE: string, NAME: string, TERMINALS: int, RUNWAYS: int, BUILD_DATE: int, CITY_POPULATION: int]

## Your topic: Arrival Delay related to the morphology of the arrival airport

We want to check what happens in each of the cities, and if there is a relation between the city and the delay that goes beyond the airports. For that purpose, **you have to get a small dataset with features of the airport**, such as the number of terminals, the year when it was built, the average length of the arriving runway, the number of runways it has, and so on. No need for a lot of features, just 4 or 5 are fine. Once you have it, answer the following questions:

* Is there a relation between the year it was built and the number of flights arriving to it?
* Are modern airports located mostly in large cities or not necessarily?
* Do airports with more terminals have larger delays, or is the opposite true? What about the runways?
* Is there any threshold in the average arriving flights per terminal, so that above that value the delays tend to increase a lot?
* Discretize the arrival delay as in the reference notebook, and relate it to the average number of arriving flights per runway. Is there a relation? Support your conclusions with data.

## Business Questions:

### 1. Is there a relation between the year it was built and the number of flights arriving to it?

In [None]:
DestDF = flightsDF.join(airportsDF,col('Dest')==col('IATA_CODE'))\
                      .withColumn('YEARS_BUILT',(2008-col('BUILD_DATE')))\
                      .withColumn('AGE_CATEGORY', when(col("YEARS_BUILT")<=40,"MODERN")\
                            .when((col("YEARS_BUILT")>40) & (col("YEARS_BUILT")<=70),"MIDDLE-AGE")\
                            .when((col("YEARS_BUILT")>70),"OLD"))\
                      .withColumn('CITY_SIZE', when(col("CITY_POPULATION")<=550000,"SMALL")\
                            .when((col("CITY_POPULATION")>550000) & (col("CITY_POPULATION")<=1600000),"MEDIUM")\
                            .when((col("CITY_POPULATION")>1600000),"LARGE"))
DestDF.cache()


In [11]:
oldDF = DestDF.select(col('Dest').alias('AIRPORT'),'BUILD_DATE','YEARS_BUILT','AGE_CATEGORY')\
                  .groupBy('AIRPORT','BUILD_DATE','YEARS_BUILT','CATEGORY')\
                  .agg(count(lit(1)).alias('ARRIVALS'))\
                  .withColumn('RATIO', round(col('ARRIVALS')/col('YEARS_BUILT'),2))\
                  .orderBy('RATIO',ascending=False)
oldDF.cache()

oldDF2= oldDF.groupBy('CATEGORY')\
             .agg(round(avg('YEARS_BUILT'),1).alias('Avg_YEARS_BUILT'),\
                  count(lit(1)).alias('TOTAL_AIRPORTS'),\
                  sum('ARRIVALS').alias('TOTAL_ARRIVALS'),\
                  round(avg('ARRIVALS'),1).alias('Avg_ARRIVALS'))\
             .withColumn('ARRIVALS PROPORTION %', round(col('TOTAL_ARRIVALS')/(sum('TOTAL_ARRIVALS').over(Window.partitionBy()))*100,1))\
             .orderBy('Avg_ARRIVALS',ascending=False)

oldDF2.show()

+----------+---------------+--------------+--------------+------------+---------------------+
|  CATEGORY|Avg_YEARS_BUILT|TOTAL_AIRPORTS|TOTAL_ARRIVALS|Avg_ARRIVALS|ARRIVALS PROPORTION %|
+----------+---------------+--------------+--------------+------------+---------------------+
|       OLD|           78.8|            32|         43143|      1348.2|                 43.1|
|MIDDLE-AGE|           60.8|            41|         50164|      1223.5|                 50.2|
|    MODERN|           21.8|             8|          6693|       836.6|                  6.7|
+----------+---------------+--------------+--------------+------------+---------------------+



We aggregated all the destination airports from the 2008 US Fligths Dataset and created 3 main categories based on the airport's built year:

1. OLD: Older than 70 years.
2. MIDDLE-AGE: Between 40 and 70 years.
3. MODERN: Younger than 40 years.

#### Quick insights: 
- More than 90% of the airports are older than 40 years.
- If we only consider the proportion of arrivals per AGE CATEGORY, MIDDLE-AGE airports cover 50% of the total arrivals.

To be fair evaluating the relation between the year an airport was built and the number of flights arriving to it, We calculated the AVERAGE ARRIVALS per AGE CATEGORY to deal with the skewness of the number of airports per category:

#### Based on this measure we can affirm that there is relation positive relation:    
- The older the airport, the higher the number of arrivals on average: The 32 "OLD" airports got on average the highest number of arrivals for 2008 with 1.348 flights. 

### 2. Are modern airports located mostly in large cities or not necessarily?

In [12]:
popDF = DestDF.groupBy('CITY_SIZE')\
                  .pivot('CATEGORY')\
                  .agg(countDistinct('Dest'))\
                  .select('CITY_SIZE',col('MODERN').alias('MODERN AIRPORTS'))\
                  .orderBy('MODERN AIRPORTS', ascending = False)\
                  .withColumn('Proportion %', round(col('MODERN AIRPORTS')/(sum('MODERN AIRPORTS').over(Window.partitionBy()))*100,1))
popDF.show()

popDF2 = DestDF.groupBy('CATEGORY')\
                  .pivot('CITY_SIZE')\
                  .agg(countDistinct('Dest'))
                  
popDF2= popDF2.withColumn('LARGE CITY Prop%', round(col('LARGE')/(sum('LARGE').over(Window.partitionBy()))*100,1))\
            .withColumn('MEDIUM CITY Prop%', round(col('MEDIUM')/(sum('MEDIUM').over(Window.partitionBy()))*100,1))\
            .withColumn('SMALL CITY Prop%', round(col('SMALL')/(sum('SMALL').over(Window.partitionBy()))*100,1))

popDF2.orderBy(col("LARGE CITY Prop%").desc(),col("MEDIUM CITY Prop%").desc(),col("SMALL CITY Prop%").desc()).show()


+---------+---------------+------------+
|CITY_SIZE|MODERN AIRPORTS|Proportion %|
+---------+---------------+------------+
|   MEDIUM|              4|        50.0|
|    SMALL|              3|        37.5|
|    LARGE|              1|        12.5|
+---------+---------------+------------+

+----------+-----+------+-----+----------------+-----------------+----------------+
|  CATEGORY|LARGE|MEDIUM|SMALL|LARGE CITY Prop%|MEDIUM CITY Prop%|SMALL CITY Prop%|
+----------+-----+------+-----+----------------+-----------------+----------------+
|       OLD|    3|     7|   22|            75.0|             31.8|            40.0|
|    MODERN|    1|     4|    3|            25.0|             18.2|             5.5|
|MIDDLE-AGE| null|    11|   30|            null|             50.0|            54.5|
+----------+-----+------+-----+----------------+-----------------+----------------+



### 3. Do airports with more terminals have larger delays, or is the opposite true? What about the runways?

#### 3.1. Terminal Delays

In [13]:
TerminalDF=DestDF.where(col("ArrDelay")!="NA")\
              .where(col("Cancelled")==0)\
              .groupBy('TERMINALS')\
              .agg(count(lit(1)).alias("NUM FLIGHTS"),\
                   countDistinct('DEST').alias('NUM AIRPORTS'),\
                  round(avg("ArrDelay"),1).alias("AVG FLIGHT DELAY(Min)")) 
TerminalDF.cache()

TerminalDF=TerminalDF.withColumn('AVG FLIGHT DELAY/TERMINAL(Min)', round(col('AVG FLIGHT DELAY(Min)')/col('TERMINALS'),1))\
          .orderBy(col('AVG FLIGHT DELAY/TERMINAL(Min)').desc())
TerminalDF.show()

+---------+-----------+------------+---------------------+------------------------------+
|TERMINALS|NUM FLIGHTS|NUM AIRPORTS|AVG FLIGHT DELAY(Min)|AVG FLIGHT DELAY/TERMINAL(Min)|
+---------+-----------+------------+---------------------+------------------------------+
|        1|      37580|          44|                  4.2|                           4.2|
|        2|      38712|          21|                  6.8|                           3.4|
|        4|       1929|           2|                 11.9|                           3.0|
|        3|      13174|          10|                  6.2|                           2.1|
|        9|       3283|           1|                 11.2|                           1.2|
|        7|       1678|           1|                  3.0|                           0.4|
|        5|       2342|           2|                 -0.6|                          -0.1|
+---------+-----------+------------+---------------------+------------------------------+



#### 3.2. Runway Delays

In [14]:
RunwayDF=DestDF.where(col("ArrDelay")!="NA")\
              .where(col("Cancelled")==0)\
              .groupBy('RUNWAYS')\
              .agg(count(lit(1)).alias("NUM FLIGHTS"),\
                   countDistinct('Dest').alias('NUM AIRPORTS'),\
                  round(avg("ArrDelay"),1).alias("AVG FLIGHT DELAY(Min)")) 
RunwayDF.cache()

RunwayDF=RunwayDF.withColumn('AVG FLIGHT DELAY/RUNWAY(Min)', round(col('AVG FLIGHT DELAY(Min)')/col('RUNWAYS'),1))\
          .orderBy(col('AVG FLIGHT DELAY/RUNWAY(Min)').desc())
RunwayDF.show()

+-------+-----------+------------+---------------------+----------------------------+
|RUNWAYS|NUM FLIGHTS|NUM AIRPORTS|AVG FLIGHT DELAY(Min)|AVG FLIGHT DELAY/RUNWAY(Min)|
+-------+-----------+------------+---------------------+----------------------------+
|      1|       3556|           3|                  9.0|                         9.0|
|      2|      25026|          29|                  5.6|                         2.8|
|      4|      31074|          14|                  7.1|                         1.8|
|      3|      30779|          30|                  4.4|                         1.5|
|      5|       6117|           3|                  4.6|                         0.9|
|      6|       2146|           2|                  4.7|                         0.8|
+-------+-----------+------------+---------------------+----------------------------+



### 4. Is there any threshold in the average arriving flights per terminal, so that above that value the delays tend to increase a lot?

In [15]:
ThresholdDF=DestDF.where(col("ArrDelay")!="NA")\
                  .where(col("Cancelled")==0)\
                  .select(col('Dest').alias('AIRPORT'),'TERMINALS','ArrDelay')\
                  .groupBy('AIRPORT','TERMINALS')\
                  .agg(count(lit(1)).alias('NUM FLIGHTS'),\
                       round((count(lit(1))/col('TERMINALS')),1).alias('AVG FLIGHTS/TERM'),\
                       round(avg('ArrDelay')/col('TERMINALS'),1).alias('AVG FLIGHT DELAY/TERMINAL'))\
                  .orderBy(col('AVG FLIGHTS/TERM').asc(),col('AVG FLIGHT DELAY/TERMINAL').asc())

ThresholdDF.cache()
ThresholdDF.show(5)

print("Linear Correlation Average Flight per Terminal vs Average Delay per Terminal")
ThresholdDF.select(round(corr(col("AVG FLIGHT DELAY/TERMINAL"),col("AVG FLIGHTS/TERM")),2).alias("Correlation")).show()

TermDF=TerminalDF.withColumn('AVG FLIGHTS/TERM', round(col('NUM FLIGHTS')/(col('TERMINALS')*col('NUM AIRPORTS')),1))\
          .select('TERMINALS','NUM AIRPORTS','NUM FLIGHTS', 'AVG FLIGHTS/TERM', 'AVG FLIGHT DELAY/TERMINAL(Min)')\
          .orderBy(col('AVG FLIGHTS/TERM').asc())

TermDF.show()
print("Linear Correlation Average Flight per Terminal vs Average Delay per Terminal")
TermDF.select(round(corr(col("AVG FLIGHT DELAY/TERMINAL(Min)"),col("AVG FLIGHTS/TERM")),2).alias("Correlation")).show()


+-------+---------+-----------+----------------+-------------------------+
|AIRPORT|TERMINALS|NUM FLIGHTS|AVG FLIGHTS/TERM|AVG FLIGHT DELAY/TERMINAL|
+-------+---------+-----------+----------------+-------------------------+
|    ATL|        2|          1|             0.5|                     -8.5|
|    SYR|        2|          1|             0.5|                     -0.5|
|    DCA|        3|          2|             0.7|                     -2.7|
|    RIC|        1|          1|             1.0|                     13.0|
|    MYR|        1|          1|             1.0|                     18.0|
+-------+---------+-----------+----------------+-------------------------+
only showing top 5 rows

Linear Correlation Average Flight per Terminal vs Average Delay per Terminal
+-----------+
|Correlation|
+-----------+
|      -0.12|
+-----------+

+---------+------------+-----------+----------------+------------------------------+
|TERMINALS|NUM AIRPORTS|NUM FLIGHTS|AVG FLIGHTS/TERM|AVG FLIGHT DEL

### 5. Discretize the arrival delay as in the reference notebook, and relate it to the average number of arriving flights per runway. Is there a relation? Support your conclusions with data.

In [16]:
RunDF=RunwayDF.withColumn('AVG FLIGHTS/RUNWAY', round(col('NUM FLIGHTS')/(col('RUNWAYS')*col('NUM AIRPORTS')),1))\
          .select('RUNWAYS','NUM AIRPORTS','NUM FLIGHTS', 'AVG FLIGHTS/RUNWAY', 'AVG FLIGHT DELAY/RUNWAY(Min)')\
          .orderBy(col('AVG FLIGHTS/RUNWAY').asc())

TotalFlightsDF = DestDF.groupBy("RUNWAYS").agg(count("ArrDelay").alias("TotalFlights"))

DelayDF = DestDF.where(col("ArrDelay")!="NA")\
                               .withColumn("DelaySeverity", when(col("ArrDelay")<=0,"1.nodelay")\
                               .when((col("ArrDelay")>0) & (col("ArrDelay")<=15),"2.acceptable")\
                               .when((col("ArrDelay")>15) & (col("ArrDelay")<=30),"3.annoying")\
                               .when((col("ArrDelay")>30) & (col("ArrDelay")<=60),"4.impactul")\
                               .otherwise("5.unacceptable"))

DelayDF.cache() 

SevereDelaysDF = DelayDF.where((col("Cancelled")==0))\
                           .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                           .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                           .select("DelaySeverity", "IntArrDelay","RUNWAYS")\
                           .groupBy("RUNWAYS", "DelaySeverity")\
                           .agg(count("IntArrDelay").alias("NumSevereDelayedFlights"))

combinedDF = SevereDelaysDF.join(TotalFlightsDF, "RUNWAYS")\
                             .withColumn("SevereDelayedRatio", round(col("NumSevereDelayedFlights")/col("TotalFlights")*100,1))\
                             .orderBy(col("SevereDelayedRatio").desc())
combinedDF.cache()


combinedDF=combinedDF.groupBy("RUNWAYS")\
          .pivot("DelaySeverity")\
          .min("SevereDelayedRatio")\
          .orderBy(col("`5.unacceptable`").desc(), col("`4.impactul`").desc(), col("`3.annoying`").desc())

FinalDF= RunDF.join(combinedDF,"RUNWAYS")\
              .withColumn("Severe Delays %", col("`5.unacceptable`")+col("`4.impactul`")+col("`3.annoying`"))\
              .select('RUNWAYS', 'AVG FLIGHTS/RUNWAY',"Severe Delays %","`5.unacceptable`","`4.impactul`","`3.annoying`")\
              .orderBy(col('AVG FLIGHTS/RUNWAY').asc())

print("Runways with % severe delayed flights by category:")
FinalDF.show()


Runways with % severe delayed flights by category:
+-------+------------------+------------------+--------------+----------+----------+
|RUNWAYS|AVG FLIGHTS/RUNWAY|   Severe Delays %|5.unacceptable|4.impactul|3.annoying|
+-------+------------------+------------------+--------------+----------+----------+
|      6|             178.8|              19.3|           4.8|       5.7|       8.8|
|      3|             342.0|              17.4|           4.1|       5.6|       7.7|
|      5|             407.8|              16.6|           4.8|       5.3|       6.5|
|      2|             431.5|18.299999999999997|           4.1|       6.0|       8.2|
|      4|             554.9|              20.5|           5.4|       6.5|       8.6|
|      1|            1185.3|22.700000000000003|           5.9|       7.7|       9.1|
+-------+------------------+------------------+--------------+----------+----------+



Any busy airport with 1-2 runways has to make those runways work as tightly as possible with little room for error. Otherwise they end up with delays. 