In [0]:
# Importing Airline dataset from DBFS to spark environment

df_airlines = spark.read.csv("dbfs:/FileStore/tables/airlines.csv", header="true", inferSchema="true")


In [0]:
# Analysing loaded Airlines dataframe 

df_airlines.display()

IATA_CODE,AIRLINE
UA,United Air Lines Inc.
AA,American Airlines Inc.
US,US Airways Inc.
F9,Frontier Airlines Inc.
B6,JetBlue Airways
OO,Skywest Airlines Inc.
AS,Alaska Airlines Inc.
NK,Spirit Air Lines
WN,Southwest Airlines Co.
DL,Delta Air Lines Inc.


In [0]:
# Importing Airports dataset from DBFS to spark environment

df_airports = spark.read.csv("dbfs:/FileStore/tables/airports.csv", header="true", inferSchema="true")

In [0]:
# Analysing loaded Airports dataframe 

df_airports.display()

IATA_CODE,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
ABE,Lehigh Valley International Airport,Allentown,PA,USA,40.65236,-75.4404
ABI,Abilene Regional Airport,Abilene,TX,USA,32.41132,-99.6819
ABQ,Albuquerque International Sunport,Albuquerque,NM,USA,35.04022,-106.60919
ABR,Aberdeen Regional Airport,Aberdeen,SD,USA,45.44906,-98.42183
ABY,Southwest Georgia Regional Airport,Albany,GA,USA,31.53552,-84.19447
ACK,Nantucket Memorial Airport,Nantucket,MA,USA,41.25305,-70.06018
ACT,Waco Regional Airport,Waco,TX,USA,31.61129,-97.23052
ACV,Arcata Airport,Arcata/Eureka,CA,USA,40.97812,-124.10862
ACY,Atlantic City International Airport,Atlantic City,NJ,USA,39.45758,-74.57717
ADK,Adak Airport,Adak,AK,USA,51.87796,-176.64603


In [0]:
# Importing flight dataset from DBFS to spark environment

df_flight = spark.read.csv("dbfs:/FileStore/tables/flights", header="true", inferSchema="true")

In [0]:
# Analysing loaded flight dataframe 

df_flight.display()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
2015,7,1,3,MQ,3029,N935MQ,DFW,CID,1148,1204.0,16.0,12.0,1216.0,124,119.0,103.0,685,1359.0,4.0,1352,1403.0,11.0,0,0,,,,,,
2015,7,1,3,MQ,3283,N690MQ,DFW,MLI,1149,1221.0,32.0,12.0,1233.0,122,116.0,101.0,691,1414.0,3.0,1351,1417.0,26.0,0,0,,0.0,0.0,26.0,0.0,0.0
2015,7,1,3,DL,884,N986AT,TPA,LGA,1149,1259.0,70.0,11.0,1310.0,161,146.0,128.0,1010,1518.0,7.0,1430,1525.0,55.0,0,0,,0.0,0.0,6.0,49.0,0.0
2015,7,1,3,B6,265,N593JB,SMF,LGB,1149,1151.0,2.0,15.0,1206.0,80,85.0,66.0,387,1312.0,4.0,1309,1316.0,7.0,0,0,,,,,,
2015,7,1,3,B6,249,N203JB,DCA,TPA,1149,1206.0,17.0,15.0,1221.0,137,129.0,112.0,814,1413.0,2.0,1406,1415.0,9.0,0,0,,,,,,
2015,7,1,3,AS,603,N477AS,LAS,SEA,1150,1150.0,0.0,21.0,1211.0,145,146.0,116.0,867,1407.0,9.0,1415,1416.0,1.0,0,0,,,,,,
2015,7,1,3,AA,1191,N4YSAA,DFW,BNA,1149,1152.0,3.0,21.0,1213.0,112,119.0,93.0,631,1346.0,5.0,1341,1351.0,10.0,0,0,,,,,,
2015,7,1,3,NK,561,N635NK,DEN,LAS,1148,1142.0,-6.0,12.0,1154.0,110,101.0,82.0,628,1216.0,7.0,1238,1223.0,-15.0,0,0,,,,,,
2015,7,1,3,HA,328,N486HA,HNL,OGG,1148,1158.0,10.0,12.0,1210.0,39,40.0,22.0,100,1232.0,6.0,1227,1238.0,11.0,0,0,,,,,,
2015,7,1,3,MQ,3099,N510MQ,BUF,ORD,1148,1141.0,-7.0,23.0,1204.0,110,128.0,74.0,473,1218.0,31.0,1238,1249.0,11.0,0,0,,,,,,


In [0]:
# Registering tables so they are accessible via SQL Context for all the aforementioned dataframes 

# Airlines
df_airlines.createOrReplaceTempView("airlines")

# Airports
df_airports.createOrReplaceTempView("airports")

# Flights

df_flight.createOrReplaceTempView("flight")

In [0]:
# Importing necessary pyspark libraries

import pyspark.sql.functions as F


# Total number of flights by airline and airport on a monthly basis

nbr_flights = df_flight.groupBy("YEAR", "MONTH", "AIRLINE", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT")\
                       .agg({"FLIGHT_NUMBER": "count"})\
                       .withColumnRenamed('count(FLIGHT_NUMBER)', 'NUMBER_OF_FLIGHTS')

nbr_flights.display()


YEAR,MONTH,AIRLINE,ORIGIN_AIRPORT,DESTINATION_AIRPORT,NUMBER_OF_FLIGHTS
2015,7,AA,MSP,PHL,112
2015,7,DL,MSP,BIL,91
2015,7,OO,ORD,BHM,8
2015,7,DL,ATL,BHM,315
2015,7,WN,RDU,BNA,89
2015,7,B6,PBI,LGA,93
2015,7,F9,IAD,MIA,31
2015,7,B6,JFK,LAS,120
2015,7,AA,BNA,LAX,60
2015,7,WN,DEN,BNA,123


In [0]:
# On-time percentage of each airline for the year 2015

# Calculating total number of flights per airline

tot_flights_airline = df_flight.filter(df_flight.YEAR == 2015)\
                       .groupBy("AIRLINE")\
                       .agg({"FLIGHT_NUMBER": "count"})\
                       .withColumnRenamed('count(FLIGHT_NUMBER)', 'TOTAL_FLIGHTS')
                       
# tot_flights_airline.display()

# Calculating On time flights per airline

tot_flights_airline_ontime = df_flight.filter((df_flight.YEAR == 2015) & (df_flight.DEPARTURE_DELAY == 0) & (df_flight.ARRIVAL_DELAY == 0))\
                               .groupBy("AIRLINE")\
                               .agg({"FLIGHT_NUMBER": "count"})\
                               .withColumnRenamed('count(FLIGHT_NUMBER)', 'TOTAL_ONTIME_FLIGHTS')\
                               .withColumnRenamed("AIRLINE", "ONTIME_AIRLINE")

# tot_flights_airline_ontime.display()

# On-time percentage calculation 

joined_df = tot_flights_airline.join(tot_flights_airline_ontime, tot_flights_airline.AIRLINE == tot_flights_airline_ontime.ONTIME_AIRLINE, 'inner')\
                .select("AIRLINE", "TOTAL_FLIGHTS", "TOTAL_ONTIME_FLIGHTS")\
                .withColumn("ONTIME_PERCENTAGE", ((F.col("TOTAL_ONTIME_FLIGHTS") / F.col("TOTAL_FLIGHTS")) * 100))

percent_flight_by_airline = joined_df.select("AIRLINE", "ONTIME_PERCENTAGE")

percent_flight_by_airline.display()


AIRLINE,ONTIME_PERCENTAGE
UA,0.1274999195814727
NK,0.1282924019479499
AA,0.1282709081580297
EV,0.1690179275218788
B6,0.1318715705010002
DL,0.1996928590585224
OO,0.1878913353381792
F9,0.1407280557486563
MQ,0.1870092790863669
HA,0.3663358660257404


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Airlines with the largest number of delays

# Calculating delayed flights per airline

airline_delayed = df_flight.filter((df_flight.DEPARTURE_DELAY > 0) & (df_flight.ARRIVAL_DELAY > 0))\
                               .groupBy("AIRLINE")\
                               .agg({"FLIGHT_NUMBER": "count"})\
                               .withColumnRenamed('count(FLIGHT_NUMBER)', 'TOTAL_DELAYED_FLIGHTS')\
                               .withColumn("row_number", row_number().over(Window.orderBy(F.desc("TOTAL_DELAYED_FLIGHTS"))))\
                               .withColumnRenamed('AIRLINE', 'IATA_CODE')
                               
# airline_delayed.display()

tot_flights_airline_delayed = airline_delayed.filter(airline_delayed.row_number == 1)\
                                    .join(df_airlines, airline_delayed.IATA_CODE == df_airlines.IATA_CODE, 'inner')\
                                    .select("AIRLINE", 'TOTAL_DELAYED_FLIGHTS')\
                                    .sort(F.col("TOTAL_DELAYED_FLIGHTS").desc())

tot_flights_airline_delayed.display()



AIRLINE,TOTAL_DELAYED_FLIGHTS
Southwest Airlines Co.,279223


In [0]:
# Cancellation reasons by airport

# Mapping Cancellation reason code with actual cancellation reason

map_df = sqlContext.createDataFrame([("Airline/Carrier", "A"),
                                   ("Weather", "B"),
                                   ("National Air System", "C"),
                                   ("Security", "D")], 
                                   ('CANCELLATION_REASON_DESC', 'CODE'))

flight_cancelled = df_flight.filter((df_flight.CANCELLED == 1) & (df_flight.CANCELLATION_REASON.isNotNull()))\
                                 .join(df_airports, df_flight.ORIGIN_AIRPORT == df_airports.IATA_CODE, 'inner')\
                                 .join(map_df, df_flight.CANCELLATION_REASON == map_df.CODE, 'inner')\
                                 .select("FLIGHT_NUMBER", "AIRPORT", "CANCELLATION_REASON_DESC")
                               
flight_cancelled.display()


FLIGHT_NUMBER,AIRPORT,CANCELLATION_REASON_DESC
2147,LaGuardia Airport (Marine Air Terminal),Airline/Carrier
4137,Norman Y. Mineta San José International Airport,Airline/Carrier
590,Chicago O'Hare International Airport,Airline/Carrier
6371,Seattle-Tacoma International Airport,Airline/Carrier
834,Bob Hope Airport (Hollywood Burbank Airport),Airline/Carrier
1804,George Bush Intercontinental Airport,Airline/Carrier
3578,Los Angeles International Airport,Airline/Carrier
3280,El Paso International Airport,Airline/Carrier
1017,Austin-Bergstrom International Airport,Airline/Carrier
4846,Fort Wayne International Airport,Airline/Carrier


In [0]:
# Delay reasons by airport

from pyspark.sql.functions import expr


# Creating dataframe with required columns
# Creating a data from year, month and day columns

flight_delay_df = df_flight.filter((df_flight.SECURITY_DELAY.isNotNull())\
                                        & (df_flight.AIR_SYSTEM_DELAY.isNotNull())\
                                        & (df_flight.AIRLINE_DELAY.isNotNull())\
                                        & (df_flight.LATE_AIRCRAFT_DELAY.isNotNull())\
                                        & (df_flight.WEATHER_DELAY.isNotNull()))\
                                 .join(df_airports, df_flight.ORIGIN_AIRPORT == df_airports.IATA_CODE, 'inner')\
                                 .withColumn("DATE", expr("make_date(YEAR, MONTH, DAY)"))\
                                 .select("DATE", "AIRPORT", "SECURITY_DELAY", "AIR_SYSTEM_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY")
                               
# flight_delay_df.display()    

cols = flight_delay_df.columns[2:]
flight_delay_reasons = flight_delay_df.selectExpr('DATE', 'AIRPORT', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))\
                               .withColumnRenamed('col0', 'DELAY_REASON')\
                               .withColumnRenamed('col1', 'DELAY_TIME')\
                               .where("DELAY_TIME <> 0")

flight_delay_reasons.display()



DATE,AIRPORT,DELAY_REASON,DELAY_TIME
2015-07-01,Dallas/Fort Worth International Airport,AIRLINE_DELAY,26
2015-07-01,Tampa International Airport,AIRLINE_DELAY,6
2015-07-01,Tampa International Airport,LATE_AIRCRAFT_DELAY,49
2015-07-01,Mobile Regional Airport,LATE_AIRCRAFT_DELAY,98
2015-07-01,Buffalo Niagara International Airport,LATE_AIRCRAFT_DELAY,15
2015-07-01,George Bush Intercontinental Airport,AIR_SYSTEM_DELAY,2
2015-07-01,George Bush Intercontinental Airport,WEATHER_DELAY,74
2015-07-01,George Bush Intercontinental Airport,WEATHER_DELAY,136
2015-07-01,George Bush Intercontinental Airport,AIR_SYSTEM_DELAY,8
2015-07-01,George Bush Intercontinental Airport,LATE_AIRCRAFT_DELAY,34


In [0]:
# Airline with the most unique routes

sql_query = "SELECT airlines.AIRLINE, COUNT(DISTINCT ORIGIN_AIRPORT, DESTINATION_AIRPORT) AS UNIQUE_ROUTES FROM flight\
             JOIN airlines on flight.AIRLINE = airlines.IATA_CODE\
             GROUP BY airlines.AIRLINE\
             ORDER BY UNIQUE_ROUTES DESC\
             LIMIT 1"

unique_routes = spark.sql(sql_query)

unique_routes.display()


AIRLINE,UNIQUE_ROUTES
Atlantic Southeast Airlines,1351


In [0]:
# Configurations for writing data to Snowflake database

# snowflake connection options
options = {
  "sfUrl": "https://gp09164.us-central1.gcp.snowflakecomputing.com/",
  "sfUser": "",
  "sfPassword": "",
  "sfDatabase": "USER_NIKHIL",
  "sfSchema": "DBO",
  "sfWarehouse": "VIEW_WH"
}

In [0]:
# Writing data for the created reports to the snowflake database

spark.range(5).write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "TEST") \
  .save()
