## Intro - Northwoods Airlines Analysis - Databricks

Client Northwoods Airlines requests POV using both Snowflake and Databricks.  This is part of an investigation into the benefits of using these platforms.

This is the **Databricks** Analysis.

The data owners have provided datasets for airports, airlines and flights and shared it over [Google Drive](https://drive.google.com/drive/folders/18Mkt2Ku3gIxenT-zjYi68kcufpcvNwbv).

## Assumptions

- Data is good.  No mayor cleaning needs to be performed.

## Reports

The following reports are exemplary of the industry reporting and insight gathering as competitive advantages.

- Total Number Of Flights By Airline and Airport, Month Granularity
- On Time Percentage Per Airline For The Year 2015
- Airlines With The Largest Number Of Delays
- Cancelation Reasons By Airport
- Delay Reasons By Airport
- Airline With The Most Unique Route

## Data Prep

### Step 1 - Importing Customer Data

The customer provided data for flights, airlines and airports.

In [0]:
# Import data files provided by the customer and save them as dataframes.

df_airlines = spark.read.csv("/FileStore/tables/airlines.csv", header="true", inferSchema="true")

df_airports = spark.read.csv("/FileStore/tables/airports.csv", header="true", inferSchema="true")

df_flights = spark.read.csv("/FileStore/tables/partition_01.csv", header="true", inferSchema="true")

df_partition_02 = spark.read.csv("/FileStore/tables/partition_02.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_02)

df_partition_03 = spark.read.csv("/FileStore/tables/partition_03.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_03)

df_partition_04 = spark.read.csv("/FileStore/tables/partition_04.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_04)

df_partition_05 = spark.read.csv("/FileStore/tables/partition_05.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_05)

df_partition_06 = spark.read.csv("/FileStore/tables/partition_06.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_06)

df_partition_07 = spark.read.csv("/FileStore/tables/partition_07.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_07)

df_partition_08 = spark.read.csv("/FileStore/tables/partition_08.csv", header="true", inferSchema="true")
df_flights = df_flights.union(df_partition_08)

df_flights = df_flights.persist()

### Step 2 - Merging Airport Data to Flight Data

We are building a wide-table for reporting.  First, lets combine the airport and the flight data.

In [0]:
# Merging airports to flights.

# Assuming ALL analysis is expected with regards to origin airport unless specified otherwise
# df_flights(origin_airport,destination_airport)
# df_airports(iata_code)

# Origin airport dataframe
df_origin_airport = (
    df_airports
       .withColumnRenamed('IATA_CODE', 'ORIGIN_AIRPORT_IATA_CODE')
       .withColumnRenamed('AIRPORT', 'ORIGIN_AIRPORT_NAME')
       .withColumnRenamed('CITY','ORIGIN_AIRPORT_CITY')
       .withColumnRenamed('STATE', 'ORIGIN_AIRPORT_STATE')
       .withColumnRenamed('COUNTRY','ORIGIN_AIRPORT_COUNTRY')
       .withColumnRenamed('LATITUDE', 'ORIGIN_AIRPORT_LATITUDE')
       .withColumnRenamed('LONGITUDE','ORIGIN_AIRPORT_LONGITUDE') )

# Join Flights To Origin Airport
df_flights = df_flights.join(df_origin_airport, df_flights.ORIGIN_AIRPORT ==  df_origin_airport.ORIGIN_AIRPORT_IATA_CODE,"inner")

# todo - can drop ORIGIN_AIRPORT_IATA_CODE

# Destination airport dataframe
df_destination_airport = (
    df_airports
       .withColumnRenamed('IATA_CODE', 'DESTINATION_AIRPORT_IATA_CODE')
       .withColumnRenamed('AIRPORT', 'DESTINATION_AIRPORT_NAME')
       .withColumnRenamed('CITY','DESTINATION_AIRPORT_CITY')
       .withColumnRenamed('STATE', 'DESTINATION_AIRPORT_STATE')
       .withColumnRenamed('COUNTRY','DESTINATION_AIRPORT_COUNTRY')
       .withColumnRenamed('LATITUDE', 'DESTINATION_AIRPORT_LATITUDE')
       .withColumnRenamed('LONGITUDE','DESTINATION_AIRPORT_LONGITUDE') )

# Join Flights To Origin Airport
df_flights = df_flights.join(df_destination_airport, df_flights.DESTINATION_AIRPORT ==  df_destination_airport.DESTINATION_AIRPORT_IATA_CODE,"inner")

# todo - can drop DESTINATION_AIRPORT_IATA_CODE

### Step 3 - Merging Airline Data to Flight Data

Lastly, lets add the airline data to our table.

In [0]:
# Merging airlines to flights

# df_flights(origin_airport,destination_airport)
# df_airlines(iata_code)

# Destination airport dataframe
df_airlines = (
    df_airlines
       .withColumnRenamed('IATA_CODE', 'AIRLINE_IATA_CODE')
       .withColumnRenamed('AIRLINE', 'AIRLINE_NAME') )

# Join Flights To Origin Airport
df_flights = df_flights.join(df_airlines, df_flights.AIRLINE ==  df_airlines.AIRLINE_IATA_CODE,"inner")

# todo - can drop AIRLINE_IATA_CODE

### Step 4 - How Many Years Of Data Were Provided?

Looking at the flight data given, we see that we only have data for year 2015.  

```python
  display(df_flights.groupBy("YEAR").count())
```

Because of this, we supress such from the results and show in header info instead.

In [0]:
# Lets find out how many years of data we have in set provided 

display(df_flights.groupBy("YEAR").count())

# Because we have only year 2015, lets supress it from results and show in header info instead.

YEAR,count
2015,3920766


## Reporting

### Total Number Of Flights By Airline and Airport, 2015

This report returns the monthly number of flights by airline and airport for the available data.

In [0]:
# Report 1 - Total number of flights by airline and airport on a monthly basis

import pyspark.sql.functions as F

df_report_1 = (
  df_flights
  .groupBy("AIRLINE_NAME","ORIGIN_AIRPORT_NAME","MONTH")
  .count()
  .withColumnRenamed("AIRLINE_NAME", "Airline")
  .withColumnRenamed("ORIGIN_AIRPORT_NAME", "Airport")
  .withColumnRenamed("MONTH", "Month")
  .withColumnRenamed("count", "Flights")
  .orderBy("AIRLINE_NAME","ORIGIN_AIRPORT_NAME","MONTH", ascending=True)
  .persist()
  .withColumn("Flights", F.format_number("Flights", 0)) 
)

display(df_report_1)

Airline,Airport,Month,Flights
Alaska Airlines Inc.,Adak Airport,1,9
Alaska Airlines Inc.,Adak Airport,2,8
Alaska Airlines Inc.,Adak Airport,3,9
Alaska Airlines Inc.,Adak Airport,4,9
Alaska Airlines Inc.,Adak Airport,5,9
Alaska Airlines Inc.,Adak Airport,6,8
Alaska Airlines Inc.,Adak Airport,7,9
Alaska Airlines Inc.,Adak Airport,8,9
Alaska Airlines Inc.,Albuquerque International Sunport,1,31
Alaska Airlines Inc.,Albuquerque International Sunport,2,28


### Airline On Time Percentage, 2015

This report returns the percentage of the flights that were on-time by airline.

- We define an on-time flight as a flight with zero delays.  
- A zero-delay-flight is a flight with **ARRIVAL_DELAY = 0**.
- Flight data without **ARRIVAL_DELAY** is discarded.  These records are missing either **SCHEDULED_DELAY** or **ARRIVAL_TIME**.
- Flight data with **ARRIVAL_DELAY < 0** is discarded as well.  These are flights that arrived early.

In [0]:
# Report 2 - On time percentage of each airline for the year 2015

from pyspark.sql.functions import concat, col, lit
from pyspark.sql.functions import lit
import pyspark.sql.functions as func

df_flights_onTime = (
  df_flights[["AIRLINE_NAME"]]
  .filter("YEAR = 2015")
  .filter("arrival_delay IS NOT NULL")
  .filter("arrival_delay >= 0")
  .filter("arrival_delay = 0")
  .groupBy("AIRLINE_NAME").count()  
  .withColumnRenamed("count", "onTime") 
)

# Lets find the count of flights that are on time
df_flights_delayed = (
  df_flights[["AIRLINE_NAME"]]
  .filter("YEAR = 2015")
  .filter("arrival_delay IS NOT NULL")
  .filter("arrival_delay >= 0")
  .groupBy("AIRLINE_NAME").count()  
  .withColumnRenamed("AIRLINE_NAME", "AIRLINE_NAME_DROP")
  .withColumnRenamed("count", "Total") 
)

df_report_2 = (
  df_flights_onTime
  .join(df_flights_delayed, df_flights_onTime.AIRLINE_NAME ==  df_flights_delayed.AIRLINE_NAME_DROP,"inner")
  .drop(col("AIRLINE_NAME_DROP"))
  .withColumn("On-Time (%)", col("onTime")/col("Total") * 100.0)
  .drop(col("onTime"))
  .drop(col("Total"))
  .orderBy("AIRLINE_NAME") 
)

df_report_2 = (
  df_report_2
  .withColumn("On-Time (%)", func.round(df_report_2["On-Time (%)"], 2))
)

display(df_report_2)

AIRLINE_NAME,On-Time (%)
Alaska Airlines Inc.,7.23
American Airlines Inc.,4.96
American Eagle Airlines Inc.,4.46
Atlantic Southeast Airlines,5.68
Delta Air Lines Inc.,6.21
Frontier Airlines Inc.,4.29
Hawaiian Airlines Inc.,9.86
JetBlue Airways,4.36
Skywest Airlines Inc.,6.05
Southwest Airlines Co.,5.56


### Airlines With Largest Number Of Delays

This report returns airlines with the five largest number of delays.

In [0]:
# Airlines with the largest number of delays

import pyspark.sql.functions as F

df_report_3 = df_flights

df_report_3 = (
  df_flights
  .filter("DEPARTURE_DELAY > 0")
  .groupBy("AIRLINE_NAME")
  .count()
  .withColumnRenamed("AIRLINE_NAME", "Airline")
  .withColumnRenamed("count", "Delays")
  .orderBy("count", ascending=False)
  .persist()
  .withColumn("Delays", F.format_number("Delays", 0)) 
)

display(df_report_3.limit(5))

Airline,Delays
Southwest Airlines Co.,395439
Delta Air Lines Inc.,200207
United Air Lines Inc.,185714
American Airlines Inc.,155094
Atlantic Southeast Airlines,125056


### Cancellation Reasons By Airport

This report returns the number of cancellations by reason for each airport.

In [0]:
# Cancellation reasons by airport

import pyspark.sql.functions as F

df_report_4 = (
  df_flights
  .groupBy("ORIGIN_AIRPORT_NAME","CANCELLATION_REASON")
  .count()
  .withColumnRenamed("ORIGIN_AIRPORT_NAME", "Airport")
  .withColumnRenamed("CANCELLATION_REASON", "Cancellation Reason")
  .withColumnRenamed("count", "Cancelations")
  .orderBy("ORIGIN_AIRPORT_NAME","CANCELLATION_REASON", ascending=True)
  .persist()
  .withColumn("Cancelations", F.format_number("Cancelations", 0)) )

# todo - can coalesce unknon for null

display(df_report_4)

Airport,Cancellation Reason,Cancelations
Aberdeen Regional Airport,,480
Aberdeen Regional Airport,A,5
Aberdeen Regional Airport,B,1
Abilene Regional Airport,,1657
Abilene Regional Airport,A,8
Abilene Regional Airport,B,70
Abraham Lincoln Capital Airport,,1102
Abraham Lincoln Capital Airport,A,15
Abraham Lincoln Capital Airport,B,34
Abraham Lincoln Capital Airport,C,3


### Delay Reasons By Airport

This report provides the number of delays by airport by reason.

In [0]:
# Delay reasons by airport

import pyspark.sql.functions as F

df_report_5 = (
  df_flights
  .groupBy("ORIGIN_AIRPORT_NAME")
  .sum("AIR_SYSTEM_DELAY","SECURITY_DELAY","AIRLINE_DELAY","LATE_AIRCRAFT_DELAY","WEATHER_DELAY")
  .withColumnRenamed("ORIGIN_AIRPORT_NAME", "Airport")
  .withColumnRenamed("sum(AIR_SYSTEM_DELAY)", "Air System Delay")
  .withColumnRenamed("sum(SECURITY_DELAY)", "Security Delay")
  .withColumnRenamed("sum(AIRLINE_DELAY)", "Airline Delay")
  .withColumnRenamed("sum(LATE_AIRCRAFT_DELAY)", "Late Aircraft Delay")
  .withColumnRenamed("sum(WEATHER_DELAY)", "Weather Delay")
  .orderBy("ORIGIN_AIRPORT_NAME")
  .persist()
  .withColumn("Air System Delay", F.format_number("Air System Delay", 0))
  .withColumn("Security Delay", F.format_number("Security Delay", 0))
  .withColumn("Airline Delay", F.format_number("Airline Delay", 0))
  .withColumn("Late Aircraft Delay", F.format_number("Late Aircraft Delay", 0))
  .withColumn("Weather Delay", F.format_number("Weather Delay", 0)) 
)

display(df_report_5)

Airport,Air System Delay,Security Delay,Airline Delay,Late Aircraft Delay,Weather Delay
Aberdeen Regional Airport,1066,9,3949,1986,101
Abilene Regional Airport,3823,46,5646,6000,3769
Abraham Lincoln Capital Airport,4619,44,3061,6759,477
Adak Airport,173,485,43,188,32
Akron-Canton Regional Airport,11321,0,11981,16260,2724
Albany International Airport,9144,93,17693,18323,2521
Albert J. Ellis Airport,1406,0,3335,5288,0
Albuquerque International Sunport,27369,75,44070,72720,6503
Alexandria International Airport,7099,106,12146,12309,2321
Alpena County Regional Airport,344,0,757,3563,228


### Airline With The Largest Number Of Unique Routes

This report returns the airline offering the largest number of unique routes.

In [0]:
# Airline with the most unique routes

from pyspark.sql.functions import concat, col, lit
import pyspark.sql.functions as F

df_report_6 = (
  df_flights
  .withColumn("ROUTE",concat(col("ORIGIN_AIRPORT"),lit('-'),col("DESTINATION_AIRPORT")))
)

df_report_6 = df_report_6[["AIRLINE_NAME","ROUTE"]]

df_report_6 = (
  df_report_6
  .groupBy("AIRLINE_NAME","ROUTE")
  .count()
  .orderBy("AIRLINE_NAME", ascending=True)
  .drop("count","ROUTE")
  .groupBy("AIRLINE_NAME")
  .count()
  .withColumnRenamed("AIRLINE_NAME", "Airline")
  .withColumnRenamed("count", "Unique Routes")
  .orderBy("count", ascending=False)
  .persist()
  .withColumn("Unique Routes", F.format_number("Unique Routes", 0)) 
)

display(df_report_6.limit(1))

Airline,Unique Routes
Atlantic Southeast Airlines,1351
