# Flights Analysis (January 2008)

Flights analysis is going to be performed as follows:

1. PySpark **environment setup**
2. Data source and **Spark data abstraction** (DataFrame) **set up**
3. Data set **metadata analysis**:
  1. Display **schema and size** of the DataFrame
  2. Get one or multiple **random samples** from the data set to better understand what the data is all about
  3. Identify **data entities**, **metrics** and **dimensions**
  4. **Columns/fields categorization**
4. Columns groups **basic profiling** to better understand our data set:
  1. **Timing related** columns basic profiling
  2. **Flight related** columns basic profiling
  3. **Issue related** columns basic profiling
5. **Answer some business questions** to improve service
  1. **Ratio of delayed** (and no cancelled) flights by severity
  2. **Severe delayed flights statistics** by type of delay (carrier, weather, NAS, security and lateaircraft)
  3. **Top 20 origin airports** (and figures) involved in severe delays

Let's go for it:

## 1. PySpark environment setup

In [None]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## 2. Data source and Spark data abstraction (DataFrame) setup

In [None]:
flightsDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("flights_jan08.csv")

## 3. Data set metadata analysis
### A. Display schema and size of the DataFrame

In [None]:
from IPython.display import display, Markdown

flightsDF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % flightsDF.count()))

### B. Get one or multiple random samples from the data set

In [None]:
flightsDF.cache() # optimization to make the processing faster
flightsDF.sample(False, 0.1).take(2)

### C. Data entities, metrics and dimensions

I've identified the following elements:

* **Entities:** Flight (main one which is measured - facts), Airport (dimension), Air Carrier (dimension)
* **Metrics:** Departure time, scheduled departure time, arrival time, scheduled arrival time, ...
* **Dimensions:** Origin, destination, tailNum, flight number, ...

### D. Column categorization

The following could be a potential column categorization:

* **Timing related columns:** *Year*, *Month*, *DayofMonth*, *DayOfWeek*, *DepTime*, *CRSDepTime*, *ArrTime* and *CRSArrTime*
* **Flight related columns:** *UniqueCarrier*, *FlightNum*, *TailNum*, *ActualElapsedTime*, *CRSElapsedTime*, *AirTime*, *Origin*, *Dest*, *Distance*, *TaxiIn* and *TaxiOut*
* **Issue related columns:** *ArrDelay*, *DepDelay*, *Cancelled*, *CancellationCode*, *Diverted*, *CarrierDelay*, *WeatherDelay*, *NASDelay*, *SecurityDelay* and *LateAircraftDelay*

## 4. Columns groups basic profiling to better understand our data set
### A. Timing related columns basic profiling

In [None]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print ("Summary of columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select("Year","Month","DayofMonth","DayOfWeek").summary().show()

print("Checking for nulls on columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Year","Month","DayofMonth","DayOfWeek"]]).show()

print("Checking amount of distinct values in columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select([countDistinct(c).alias(c) for c in ["Year","Month","DayofMonth","DayOfWeek"]]).show()

print ("Most and least frequent occurrences for DayofMonth and DayOfWeek columns:")
dayofMonthOccurrencesDF = flightsDF.groupBy("DayofMonth").agg(count(lit(1)).alias("Total"))
dayOfWeekDF = flightsDF.groupBy("DayOfWeek").agg(count(lit(1)).alias("Total"))

leastFreqDayOfMonth    = dayofMonthOccurrencesDF.orderBy(col("Total").asc()).first()
mostFreqDayOfMonth     = dayofMonthOccurrencesDF.orderBy(col("Total").desc()).first()
leastFreqDayOfWeek     = dayOfWeekDF.orderBy(col("Total").asc()).first()
mostFreqDayOfWeek      = dayOfWeekDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqDayOfMonth", "mostFreqDayOfMonth", "leastFreqDayOfWeek", "mostFreqDayOfWeek", \
       "%d (%d occurrences)" % (leastFreqDayOfMonth["DayofMonth"], leastFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (mostFreqDayOfMonth["DayofMonth"], mostFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (leastFreqDayOfWeek["DayOfWeek"], leastFreqDayOfWeek["Total"]), \
       "%d (%d occurrences)" % (mostFreqDayOfWeek["DayOfWeek"], mostFreqDayOfWeek["Total"]))))

print ("Summary of columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select("DepTime","CRSDepTime","ArrTime","CRSArrTime").summary().show()

print("Checking for nulls on columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["DepTime","CRSDepTime","ArrTime","CRSArrTime"]]).show()

print("Checking amount of distinct values in columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select([countDistinct(c).alias(c) for c in ["DepTime","CRSDepTime","ArrTime","CRSArrTime"]]).show()

### B. Flight related columns basic profiling

In [None]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select("UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance").summary().show()

print("Checking for nulls on columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance"]]).show()

print("Checking amount of distinct values in columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select([countDistinct(c).alias(c) for c in ["UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance"]]).show()

print ("Most and least frequent occurrences for FlightNum, TailNum, Origin and Dest columns:")
FlightNumDF = flightsDF.groupBy("FlightNum").agg(count(lit(1)).alias("Total"))
TailNumDF   = flightsDF.groupBy("TailNum").agg(count(lit(1)).alias("Total"))
OriginDF    = flightsDF.groupBy("Origin").agg(count(lit(1)).alias("Total"))
DestDF      = flightsDF.groupBy("Dest").agg(count(lit(1)).alias("Total"))

leastFreqFlightNum    = FlightNumDF.orderBy(col("Total").asc()).first()
mostFreqFlightNum     = FlightNumDF.orderBy(col("Total").desc()).first()
leastFreqTailNum      = TailNumDF.orderBy(col("Total").asc()).first()
mostFreqTailNum       = TailNumDF.orderBy(col("Total").desc()).first()
leastFreqOrigin       = OriginDF.orderBy(col("Total").asc()).first()
mostFreqOrigin        = OriginDF.orderBy(col("Total").desc()).first()
leastFreqDest         = DestDF.orderBy(col("Total").asc()).first()
mostFreqDest          = DestDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqFlightNum", "mostFreqFlightNum", "leastFreqTailNum", "mostFreqTailNum", \
       "%d (%d occurrences)" % (leastFreqFlightNum["FlightNum"], leastFreqFlightNum["Total"]), \
       "%d (%d occurrences)" % (mostFreqFlightNum["FlightNum"], mostFreqFlightNum["Total"]), \
       "%s (%d occurrences)" % (leastFreqTailNum["TailNum"], leastFreqTailNum["Total"]), \
       "%s (%d occurrences)" % (mostFreqTailNum["TailNum"], mostFreqTailNum["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqOrigin", "mostFreqOrigin", "leastFreqDest", "mostFreqDest", \
       "%s (%d occurrences)" % (leastFreqOrigin["Origin"], leastFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (mostFreqOrigin["Origin"], mostFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (leastFreqDest["Dest"], leastFreqDest["Total"]), \
       "%s (%d occurrences)" % (mostFreqDest["Dest"], mostFreqDest["Total"]))))

print ("Summary of columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select("ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut").summary().show()

print("Checking for nulls on columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut"]]).show()

print("Checking amount of distinct values in columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select([countDistinct(c).alias(c) for c in ["ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut"]]).show()

### C. Issue related columns basic profiling

In [None]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select("ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted").summary().show()

print("Checking for nulls on columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted"]]).show()

print("Checking amount of distinct values in columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select([countDistinct(c).alias(c) for c in ["ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted"]]).show()

print ("Summary of columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select("CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay").summary().show()

print("Checking for nulls on columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]]).show()

print("Checking amount of distinct values in columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select([countDistinct(c).alias(c) for c in ["CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]]).show()


## 5. Answer some business questions to improve service

### A. Ratio of delayed (and no cancelled) flights by severity

In [None]:
from pyspark.sql.functions import count, round

# Delay severity is going to be categorized as follows (totally made up):
#
#   "nodelay"      - delay=(-infinity,0] mins
#   "acceptable"   - delay=(0,15] mins
#   "annoying"     - delay=(15,30] mins
#   "impactul"     - delay=(30,60] mins
#   "unacceptable" - delay=(60,+infinity) mins

# 1. Let's enrich the DF with delay severity based on our categorization
totalFlights = flightsDF.count()
delayCategorizationDF = flightsDF\
   .where(col("ArrDelay")!="NA")\
   .withColumn("DelaySeverity", when(col("ArrDelay")<=0,"1.nodelay")\
                               .when((col("ArrDelay")>0) & (col("ArrDelay")<=15),"2.acceptable")\
                               .when((col("ArrDelay")>15) & (col("ArrDelay")<=30),"3.annoying")\
                               .when((col("ArrDelay")>30) & (col("ArrDelay")<=60),"4.impactul")\
                               .otherwise("5.unacceptable"))
delayCategorizationDF.cache() # optimization to make the processing faster
# 2. Ready to answer to this business question
delayCategorizationDF.where(col("Cancelled")==0)\
                     .select("DelaySeverity", "ArrDelay")\
                     .groupBy("DelaySeverity")\
                     .agg(count("DelaySeverity").alias("NumFlights"), \
                          (count("DelaySeverity")/totalFlights*100).alias("Ratio"))\
                     .orderBy("DelaySeverity")\
                     .select("DelaySeverity","NumFlights",round("Ratio",2).alias("RoundedRatio")).show()

### B. Severe delayed flights statistics by type of delay (carrier, weather, NAS, security and lateaircraft)

In [None]:
from pyspark.sql.functions import max, min, avg, stddev
from pyspark.sql.types import IntegerType

# To get statistics of severe delayed flights, we have to prepare the previous DataFrame (delayCategorizationDF):
#   1. Remove cancelled flights
#   2. Keep only severe delayed flights: annoying, impactful and unacceptable
#   3. To get proper statistics, convert String columns with delay info into Integer columns
#      (none of the converted fields contain "NA" as value in severeDelaysDF - you can easily check this out) 
#
severeDelaysDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                       .withColumn("IntCarrierDelay", col("CarrierDelay").cast(IntegerType()))\
                       .withColumn("IntWeatherDelay", col("WeatherDelay").cast(IntegerType()))\
                       .withColumn("IntNASDelay", col("NASDelay").cast(IntegerType()))\
                       .withColumn("IntSecurityDelay", col("SecurityDelay").cast(IntegerType()))\
                       .withColumn("IntLateAircraftDelay", col("LateAircraftDelay").cast(IntegerType()))\
                       .select("DelaySeverity", "IntArrDelay","IntCarrierDelay","IntWeatherDelay",\
                               "IntNASDelay", "IntSecurityDelay", "IntLateAircraftDelay")
severeDelaysDF.cache() # optimization to make the processing faster

display(Markdown("**'Arrival' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntArrDelay").alias("AverageDelay"),\
                   min("IntArrDelay").alias("LowestDelay"),\
                   max("IntArrDelay").alias("HighestDelay"),\
                   stddev("IntArrDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Carrier' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntCarrierDelay").alias("AverageDelay"),\
                   min("IntCarrierDelay").alias("LowestDelay"),\
                   max("IntCarrierDelay").alias("HighestDelay"),\
                   stddev("IntCarrierDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Weather' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntWeatherDelay").alias("AverageDelay"),\
                   min("IntWeatherDelay").alias("LowestDelay"),\
                   max("IntWeatherDelay").alias("HighestDelay"),\
                   stddev("IntWeatherDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'NAS' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntNASDelay").alias("AverageDelay"),\
                   min("IntNASDelay").alias("LowestDelay"),\
                   max("IntNASDelay").alias("HighestDelay"),\
                   stddev("IntNASDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Security' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntSecurityDelay").alias("AverageDelay"),\
                   min("IntSecurityDelay").alias("LowestDelay"),\
                   max("IntSecurityDelay").alias("HighestDelay"),\
                   stddev("IntSecurityDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'LateAircraft' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntLateAircraftDelay").alias("AverageDelay"),\
                   min("IntLateAircraftDelay").alias("LowestDelay"),\
                   max("IntLateAircraftDelay").alias("HighestDelay"),\
                   stddev("IntLateAircraftDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

### C. Top 20 origin airports (and figures) involved in severe delays

In [None]:
# Our answer to this business question will be:
#   1. List of top 20 origin airports with highest severe delayed (aka unacceptable) flights ratio 
#      (based on total number of flights)
#   2. List of top 20 origin airports with severe delayed flights ratio by severity category (unacceptable,
#      impactful and annoying)

# In order to be able to deliver these insights, we need some preparation:
#   1. Define a DataFrame with total flights per Origin airport (totalFlightsOriginDF)
#   2. Define a DataFrame with aggregated data by Origin and DelaySeverity to figure out
#      number of flights delayed per severity category (severeDelaysOriginDF)
#   3. Combine both DataFrames to come up with one single DataFrame containin total flights
#      per origin airport and number of flights delayed by severity to compute ratios (combinedDF)

totalFlightsOriginDF = \
   flightsDF.groupBy("Origin")\
            .agg(count("ArrDelay").alias("TotalFlights"))
severeDelaysOriginDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                       .select("DelaySeverity", "IntArrDelay","Origin")\
                       .groupBy("Origin", "DelaySeverity")\
                       .agg(count("IntArrDelay").alias("NumSevereDelayedFlights"))
combinedDF = \
  severeDelaysOriginDF\
     .join(totalFlightsOriginDF, "Origin")\
     .withColumn("SevereDelayedRatio", round(col("NumSevereDelayedFlights")/col("TotalFlights")*100,2))\
     .orderBy(col("SevereDelayedRatio").desc())
combinedDF.cache() # optimization to make the processing faster

display(Markdown("**Top 20 origin airports** with highest severe delayed (**unacceptable**) flights ratio (in \%):"))
combinedDF.limit(20).show()
display(Markdown("**Top 20 origin airports with severe delayed flights ratio** by severity category (in \%):"))
combinedDF\
   .groupBy("Origin")\
   .pivot("DelaySeverity")\
   .min("SevereDelayedRatio")\
   .orderBy(col("`5.unacceptable`").desc(), col("`4.impactul`").desc(), col("`3.annoying`").desc())\
   .limit(20).show()

