In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib inline

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Read CSV File into DataFrame').getOrCreate()

In [4]:
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf

In [5]:
sqlContext = SQLContext(spark)



In [6]:
spark

In [7]:
print(spark.catalog.listTables())

[]


In [7]:
airport_df=spark.read.csv(r"C:\Users\16478\Desktop\dataset\airports.csv", sep=',',inferSchema=True, header=True)

In [8]:
airport_df.show(1)

+---------+--------------------+---------+-----+-------+--------+---------+
|IATA_CODE|             AIRPORT|     CITY|STATE|COUNTRY|LATITUDE|LONGITUDE|
+---------+--------------------+---------+-----+-------+--------+---------+
|      ABE|Lehigh Valley Int...|Allentown|   PA|    USA|40.65236| -75.4404|
+---------+--------------------+---------+-----+-------+--------+---------+
only showing top 1 row



In [8]:
flight_df=spark.read.csv(r"C:\Users\16478\Desktop\dataset\flights.csv", sep=',',inferSchema=True, header=True)

In [11]:
flight_df.show(1)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [13]:
airline_df=spark.read.csv(r"C:\Users\16478\Desktop\dataset\airlines.csv", sep=',',inferSchema=True, header=True)

In [142]:
airline_df.show(1)

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
+---------+--------------------+
only showing top 1 row



In [13]:
#Check the data shape for 3 files 

print("Airline Dataset Shape -",(airline_df.count(), len(airline_df.columns)))
print("Fligh Dataset Shape -",(flight_df.count(), len(flight_df.columns)))
print("Aiport Dataset Shape -",(airport_df.count(), len(airport_df.columns)))

Airline Dataset Shape - (14, 2)
Fligh Dataset Shape - (5819079, 31)
Aiport Dataset Shape - (322, 7)


In [14]:
print(spark.catalog.listTables())

[]


In [14]:
flight_df.createOrReplaceTempView('flights')

In [143]:
airline_df.createOrReplaceTempView('airlines')

In [144]:
airport_df.createOrReplaceTempView('airports')

In [18]:
print(spark.catalog.listTables())

[Table(name='airlines', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='airports', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [19]:
#Columns
print(flight_df.columns)
print('******'*20)
print(airport_df.columns)
print('******'*20)
print(airline_df.columns)

['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DEPARTURE_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLED', 'CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY']
************************************************************************************************************************
['IATA_CODE', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE']
************************************************************************************************************************
['IATA_CODE', 'AIRLINE']


In [42]:
#Data Querying

In [43]:
q1="SELECT AIRLINE, FLIGHT_NUMBER, TAIL_NUMBER, ORIGIN_AIRPORT, DESTINATION_AIRPORT, SCHEDULED_DEPARTURE FROM flights LIMIT 10"

In [44]:
flights_q1=spark.sql(q1)

In [45]:
flights_q1.show()

+-------+-------------+-----------+--------------+-------------------+-------------------+
|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|
+-------+-------------+-----------+--------------+-------------------+-------------------+
|     AS|           98|     N407AS|           ANC|                SEA|                  5|
|     AA|         2336|     N3KUAA|           LAX|                PBI|                 10|
|     US|          840|     N171US|           SFO|                CLT|                 20|
|     AA|          258|     N3HYAA|           LAX|                MIA|                 20|
|     AS|          135|     N527AS|           SEA|                ANC|                 25|
|     DL|          806|     N3730B|           SFO|                MSP|                 25|
|     NK|          612|     N635NK|           LAS|                MSP|                 25|
|     US|         2013|     N584UW|           LAX|                CLT|                 30|

In [46]:
q2= "SELECT ORIGIN_AIRPORT, DESTINATION_AIRPORT, COUNT(*) as COUNT FROM flights GROUP BY ORIGIN_AIRPORT, DESTINATION_AIRPORT"

In [47]:
flights_q2=spark.sql(q2)

In [48]:
flights_q2.show()

+--------------+-------------------+-----+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|COUNT|
+--------------+-------------------+-----+
|           BQN|                MCO|  441|
|           PHL|                MCO| 4869|
|           MCI|                IAH| 1698|
|           SPI|                ORD|  998|
|           SNA|                PHX| 3846|
|           LBB|                DEN|  618|
|           ORD|                PDX| 2149|
|           EWR|                STT|  239|
|           ATL|                GSP| 2470|
|           MCI|                MKE|  612|
|           PBI|                DCA|  978|
|           SMF|                BUR| 2092|
|           MDW|                MEM|  628|
|           LAS|                LIT|  334|
|           TPA|                ACY|  335|
|           DSM|                EWR|  191|
|           FSD|                ATL|  329|
|           SJC|                LIH|  190|
|           CLE|                SJU|   43|
|           CPR|                DEN|  956|
+----------

In [49]:
pd_q2=flights_q2.toPandas()

In [50]:
print(pd_q2.head())

  ORIGIN_AIRPORT DESTINATION_AIRPORT  COUNT
0            BQN                 MCO    441
1            PHL                 MCO   4869
2            MCI                 IAH   1698
3            SPI                 ORD    998
4            SNA                 PHX   3846


In [17]:
#Converting flight suration from min to hrs

flight_table=spark.table("flights")

In [18]:
flight_table=flight_table.withColumn('duration_hrs', flight_table.AIR_TIME/60.)

In [22]:
flight_table.select('duration_hrs').show(2)

+-----------------+
|     duration_hrs|
+-----------------+
|2.816666666666667|
|4.383333333333334|
+-----------------+
only showing top 2 rows



In [23]:
long_flight_filter=flight_table.filter("DISTANCE > 1000")

In [24]:
long_flight_filter.show(2)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+-----------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|     duration_hrs|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--

In [58]:
pd_long_flight_filter=long_flight_filter.toPandas()

In [59]:
print(pd_long_flight_filter.head())

   YEAR  MONTH  DAY  DAY_OF_WEEK AIRLINE  FLIGHT_NUMBER TAIL_NUMBER  \
0  2015      1    1            4      AS             98      N407AS   
1  2015      1    1            4      AA           2336      N3KUAA   
2  2015      1    1            4      US            840      N171US   
3  2015      1    1            4      AA            258      N3HYAA   
4  2015      1    1            4      AS            135      N527AS   

  ORIGIN_AIRPORT DESTINATION_AIRPORT  SCHEDULED_DEPARTURE  ...  ARRIVAL_DELAY  \
0            ANC                 SEA                    5  ...          -22.0   
1            LAX                 PBI                   10  ...           -9.0   
2            SFO                 CLT                   20  ...            5.0   
3            LAX                 MIA                   20  ...           -9.0   
4            SEA                 ANC                   25  ...          -21.0   

   DIVERTED  CANCELLED  CANCELLATION_REASON  AIR_SYSTEM_DELAY  SECURITY_DELAY  \
0    

In [15]:
#Select 

select_flight=flight_table.select('TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT','AIRLINE')

In [62]:
select_flight.show(2)

+-----------+--------------+-------------------+-------+
|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AIRLINE|
+-----------+--------------+-------------------+-------+
|     N407AS|           ANC|                SEA|     AS|
|     N3KUAA|           LAX|                PBI|     AA|
+-----------+--------------+-------------------+-------+
only showing top 2 rows



In [63]:
#Filter

filter_1=flight_table.ORIGIN_AIRPORT=="LAX"

filter_2=flight_table.DESTINATION_AIRPORT=="PBI"

join_filter=select_flight.filter(filter_1).filter(filter_2)


In [65]:
join_filter.show(5)

+-----------+--------------+-------------------+-------+
|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AIRLINE|
+-----------+--------------+-------------------+-------+
|     N3KUAA|           LAX|                PBI|     AA|
|     N3HPAA|           LAX|                PBI|     AA|
|     N3ERAA|           LAX|                PBI|     AA|
|     N3JHAA|           LAX|                PBI|     AA|
|     N3FKAA|           LAX|                PBI|     AA|
+-----------+--------------+-------------------+-------+
only showing top 5 rows



In [16]:
speed=(flight_table.DISTANCE/(flight_table.AIR_TIME/60)).alias("speed")

select_speed=flight_table.select('TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', speed)

In [67]:
select_speed.show(3)

+-----------+--------------+-------------------+-----------------+
|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|            speed|
+-----------+--------------+-------------------+-----------------+
|     N407AS|           ANC|                SEA|514.0828402366864|
|     N3KUAA|           LAX|                PBI|531.5589353612166|
|     N171US|           SFO|                CLT|517.8947368421052|
+-----------+--------------+-------------------+-----------------+
only showing top 3 rows



In [19]:
#Data Types conversion

flight_table = flight_table.withColumn("MONTH", flight_table.MONTH.cast("integer"))
flight_table = flight_table.withColumn("DAY_OF_WEEK", flight_table.DAY_OF_WEEK.cast("integer"))
flight_table = flight_table.withColumn("AIR_TIME", flight_table.AIR_TIME.cast("integer"))
flight_table = flight_table.withColumn("DISTANCE", flight_table.DISTANCE.cast("double"))
flight_table = flight_table.withColumn("ARRIVAL_DELAY", flight_table.ARRIVAL_DELAY.cast("integer"))


In [69]:
# How to get shortest flight from ANC 
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'ANC').groupBy().min('DISTANCE').show()
# How to get shortest flight from LAX
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'LAX').groupBy().min('DISTANCE').show()
# How to get shortest flight from SFO
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'SFO').groupBy().min('DISTANCE').show()

+-------------+
|min(DISTANCE)|
+-------------+
|        160.0|
+-------------+

+-------------+
|min(DISTANCE)|
+-------------+
|         86.0|
+-------------+

+-------------+
|min(DISTANCE)|
+-------------+
|         77.0|
+-------------+



In [70]:
# How to get shortest flight from ANC 
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'ANC').groupBy().max('DISTANCE').show()
# How to get shortest flight from LAX
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'LAX').groupBy().max('DISTANCE').show()
# How to get shortest flight from SFO
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'SFO').groupBy().max('DISTANCE').show()

+-------------+
|max(DISTANCE)|
+-------------+
|       3417.0|
+-------------+

+-------------+
|max(DISTANCE)|
+-------------+
|       2615.0|
+-------------+

+-------------+
|max(DISTANCE)|
+-------------+
|       2704.0|
+-------------+



In [71]:
# How to get shortest flight from ANC with respect to time in air 
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'ANC').groupBy().min('AIR_TIME').show()
# How to get shortest flight from LAX
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'LAX').groupBy().min('AIR_TIME').show()
# How to get shortest flight from SFO
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'SFO').groupBy().min('AIR_TIME').show()

+-------------+
|min(AIR_TIME)|
+-------------+
|           27|
+-------------+

+-------------+
|min(AIR_TIME)|
+-------------+
|           14|
+-------------+

+-------------+
|min(AIR_TIME)|
+-------------+
|            8|
+-------------+



In [72]:
# How to get longest flight from ANC with respect to time in air 
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'ANC').groupBy().max('AIR_TIME').show()
# How to get longest flight from LAX
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'LAX').groupBy().max('AIR_TIME').show()
# How to get longest flight from SFO
flight_table.filter(flight_table.ORIGIN_AIRPORT == 'SFO').groupBy().max('AIR_TIME').show()

+-------------+
|max(AIR_TIME)|
+-------------+
|          426|
+-------------+

+-------------+
|max(AIR_TIME)|
+-------------+
|          409|
+-------------+

+-------------+
|max(AIR_TIME)|
+-------------+
|          389|
+-------------+



In [73]:
tail_number_filter=flight_table.groupBy("TAIL_NUMBER")
origin_count_filter=flight_table.groupBy("ORIGIN_AIRPORT")

In [74]:
print("Tail Number Count")
tail_number_filter.count().show(5)
print("Airport Origin Count")
origin_count_filter.count().show(5)

Tail Number Count
+-----------+-----+
|TAIL_NUMBER|count|
+-----------+-----+
|     N38451|  946|
|     N567AA| 1458|
|     N623NK| 1669|
|     N442AS| 1210|
|     N902DE| 1704|
+-----------+-----+
only showing top 5 rows

Airport Origin Count
+--------------+-----+
|ORIGIN_AIRPORT|count|
+--------------+-----+
|           BGM|  262|
|           PSE|  749|
|           INL|  574|
|           MSY|38804|
|           PPG|  107|
+--------------+-----+
only showing top 5 rows



In [75]:
#Checking Data by month

In [20]:
import pyspark.sql.functions as fun

In [21]:
flight_table = flight_table.withColumn("DEPARTURE_DELAY", flight_table.DEPARTURE_DELAY.cast("integer"))

In [22]:
month=flight_table.groupBy('MONTH','DESTINATION_AIRPORT')

In [23]:
avg_Delays=month.avg('DEPARTURE_DELAY').show(5)

+-----+-------------------+--------------------+
|MONTH|DESTINATION_AIRPORT|avg(DEPARTURE_DELAY)|
+-----+-------------------+--------------------+
|    1|                ACY|    8.33433734939759|
|    1|                EYW|   6.551440329218107|
|    1|                OME|                0.05|
|    1|                RDM|   9.701863354037267|
|    1|                TWF|  -4.711864406779661|
+-----+-------------------+--------------------+
only showing top 5 rows



In [81]:
sd_Delay=month.agg(fun.stddev('DEPARTURE_DELAY')).show(5)

+-----+-------------------+----------------------------+
|MONTH|DESTINATION_AIRPORT|stddev_samp(DEPARTURE_DELAY)|
+-----+-------------------+----------------------------+
|    1|                ACY|           32.81265545745788|
|    1|                EYW|           63.65052137951282|
|    1|                OME|          19.546923118225806|
|    1|                RDM|          34.260918828983726|
|    1|                TWF|           7.985009333135624|
+-----+-------------------+----------------------------+
only showing top 5 rows



In [24]:
airport_table=airport_df.withColumnRenamed("IATA_CODE", "DESTINATION_AIRPORT")

In [84]:
#joining flight datase and airport dataset

In [25]:
air_manager=flight_table.join(airport_table,on='DESTINATION_AIRPORT', how = 'leftouter')

In [87]:
#check joined table

In [32]:
air_manager.show(2)

+-------------------+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+-----------------+--------------------+---------------+-----+-------+--------+----------+
|DESTINATION_AIRPORT|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|     duration_hrs|             AIRPORT|           CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+-------------------+----+--

In [89]:
air_manager.select('FLIGHT_NUMBER', 'AIRPORT', 'CITY', 'STATE', 'COUNTRY', 'LATITUDE', 'LONGITUDE').show(5)

+-------------+--------------------+---------------+-----+-------+--------+----------+
|FLIGHT_NUMBER|             AIRPORT|           CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+-------------+--------------------+---------------+-----+-------+--------+----------+
|           98|Seattle-Tacoma In...|        Seattle|   WA|    USA|47.44898|-122.30931|
|         2336|Palm Beach Intern...|West Palm Beach|   FL|    USA|26.68316| -80.09559|
|          840|Charlotte Douglas...|      Charlotte|   NC|    USA|35.21401| -80.94313|
|          258|Miami Internation...|          Miami|   FL|    USA|25.79325| -80.29056|
|          135|Ted Stevens Ancho...|      Anchorage|   AK|    USA|61.17432|-149.99619|
+-------------+--------------------+---------------+-----+-------+--------+----------+
only showing top 5 rows



### Highest Flight Departure Airport

In [19]:
start = sqlContext.sql("""select ORIGIN_AIRPORT, sum(SCHEDULED_DEPARTURE) as Flight
                                    from flights group by ORIGIN_AIRPORT order by sum(SCHEDULED_DEPARTURE) DESC limit 10""")
start.show()

+--------------+---------+
|ORIGIN_AIRPORT|   Flight|
+--------------+---------+
|           ATL|508100303|
|           ORD|395696913|
|           DFW|336914905|
|           DEN|272395117|
|           LAX|263070698|
|           IAH|203396933|
|           SFO|202297656|
|           PHX|197875290|
|           LAS|177168732|
|           MSP|154002871|
+--------------+---------+



## Airlines with Highest Delays

In [21]:
airlines_max_delays= sqlContext.sql("""select AIRLINE, sum(DEPARTURE_DELAY) as Delay
                                    from flights group by AIRLINE order by sum(DEPARTURE_DELAY) DESC limit 10""")
airlines_max_delays.show()

+-------+--------+
|AIRLINE|   Delay|
+-------+--------+
|     WN|13186520|
|     UA| 7355348|
|     DL| 6427294|
|     AA| 6369435|
|     EV| 4857338|
|     OO| 4517510|
|     B6| 3026467|
|     MQ| 2837908|
|     NK| 1840887|
|     F9| 1205449|
+-------+--------+



## Airports with Most Flights

In [31]:
MostFlightsByAirports = sqlContext.sql("""with destination as (select DESTINATION_AIRPORT as Airport, sum(SCHEDULED_DEPARTURE) as Out_Flights 
                                    from flights group by DESTINATION_AIRPORT),
                                    origin as (select ORIGIN_AIRPORT as Airport, sum(SCHEDULED_ARRIVAL) as In_Flights 
                                    from flights group by ORIGIN_AIRPORT)
                                    select origin.Airport, (destination.Out_Flights+origin.In_Flights) as Total_Flights
                                    from origin, destination 
                                    where origin.Airport = destination.Airport
                                    order by (origin.In_Flights + destination.Out_Flights) DESC
                                    limit 15""")
MostFlightsByAirports.show()

+-------+-------------+
|Airport|Total_Flights|
+-------+-------------+
|    ATL|    965658136|
|    ORD|    801952811|
|    DFW|    668106497|
|    LAX|    557565193|
|    DEN|    554937783|
|    SFO|    418551870|
|    PHX|    416744400|
|    IAH|    412111092|
|    LAS|    376167370|
|    SEA|    316385217|
|    MSP|    315194016|
|    MCO|    312460292|
|    BOS|    308644686|
|    DTW|    305500112|
|    EWR|    289299779|
+-------+-------------+



## Flights vs Distance

In [32]:
distanceQuery = sqlContext.sql("""with table1 as 
                                    (select least(Origin_airport, Destination_airport) as Airport1, 
                                    greatest(Destination_airport, Origin_airport) as Airport2, 
                                    mean(Distance) as Distance,
                                    sum(SCHEDULED_DEPARTURE) as Flights
                                    from flights
                                    group by least(Origin_airport, Destination_airport), greatest(Destination_airport, Origin_airport)
                                    order by 1,2)
                                    select t.*
                                    from table1 t
                                    where Flights>0
                                    order by Distance DESC
                                    limit 15;""")

distanceQuery.show(15)

+--------+--------+--------+-------+
|Airport1|Airport2|Distance|Flights|
+--------+--------+--------+-------+
|     HNL|     JFK|  4983.0| 799195|
|   12173|   12478|  4983.0|  68140|
|     EWR|     HNL|  4962.0| 979435|
|   11618|   12173|  4962.0|  90845|
|     HNL|     IAD|  4817.0| 409615|
|   12173|   12264|  4817.0|  11310|
|     ATL|     HNL|  4502.0| 899217|
|   10397|   12173|  4502.0|  83414|
|     HNL|     ORD|  4243.0| 841710|
|   12173|   13930|  4243.0|  77885|
|     OGG|     ORD|  4184.0| 294230|
|   13830|   13930|  4184.0|  12800|
|   12173|   13487|  3972.0|   5824|
|     HNL|     MSP|  3972.0| 131195|
|     HNL|     IAH|  3904.0| 963965|
+--------+--------+--------+-------+



## Using Machine Learning model to predict Delay

In [26]:
model=flight_table.select('MONTH', 'DAY_OF_WEEK', 'AIRLINE', 'TAIL_NUMBER', 'DESTINATION_AIRPORT', 'AIR_TIME', 'DISTANCE', 'ARRIVAL_DELAY')

In [27]:
model.count()

5819079

In [43]:
#Removing the null values

In [28]:
model= model.filter("ARRIVAL_DELAY is not NULL and AIRLINE is not NULL and AIR_TIME is not NULL and TAIL_NUMBER is not NULL")

In [29]:
model.count()

5714008

In [30]:
model= model.withColumn("is_late", model.ARRIVAL_DELAY > 0)

In [31]:
model = model.withColumn("is_late", model.is_late.cast("integer"))

In [32]:
model = model.withColumnRenamed("is_late", 'label')

In [33]:
model.show(1)

+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|MONTH|DAY_OF_WEEK|AIRLINE|TAIL_NUMBER|DESTINATION_AIRPORT|AIR_TIME|DISTANCE|ARRIVAL_DELAY|label|
+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
|    1|          4|     AS|     N407AS|                SEA|     169|  1448.0|          -22|    0|
+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+
only showing top 1 row



In [31]:
#Check if dataset is balalnced or not

In [34]:
print('Labels Count - ')
model.groupBy('label').count().show()

Labels Count - 
+-----+-------+
|label|  count|
+-----+-------+
|    1|2086896|
|    0|3627112|
+-----+-------+



In [35]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [36]:
airline_indexer = StringIndexer(inputCol="AIRLINE", outputCol="airline_index")

In [37]:
airline_encoder = OneHotEncoder(inputCol="airline_index", outputCol="airline_encoded")

In [38]:
dest_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="dest_index")

In [39]:
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_encoded")

In [40]:
tail_indexer = StringIndexer(inputCol="TAIL_NUMBER", outputCol="tail_index")

In [41]:
tail_encoder = OneHotEncoder(inputCol="tail_index", outputCol="tail_encoded")

In [42]:
from pyspark.ml.feature import VectorAssembler

In [43]:
vec_assembler = VectorAssembler(inputCols=["MONTH", "DAY_OF_WEEK", "AIR_TIME", "DISTANCE", "airline_encoded", "dest_encoded", "tail_encoded"], outputCol="features")

In [44]:
from pyspark.ml import Pipeline

In [45]:
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, airline_indexer, airline_encoder, tail_indexer, tail_encoder, vec_assembler])

In [46]:
piped_data = flights_pipe.fit(model).transform(model)

In [47]:
train_data, test_data = piped_data.randomSplit([.7, .3],seed=1)

In [48]:
print('Row count in train data :',  train_data.count())
print('Row count in test data :',  test_data.count())

Row count in train data : 3999011
Row count in test data : 1714997


## Using Logistic Regression as the Classifier

In [49]:
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals

In [56]:
lr = LogisticRegression(labelCol="label",featuresCol="features")

In [57]:
best_lr = lr.fit(train_data)

In [59]:
pred=best_lr.transform(test_data)

In [60]:
pred.show(5)

+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+----------+-----------------+-------------+---------------+----------+-------------------+--------------------+--------------------+--------------------+----------+
|MONTH|DAY_OF_WEEK|AIRLINE|TAIL_NUMBER|DESTINATION_AIRPORT|AIR_TIME|DISTANCE|ARRIVAL_DELAY|label|dest_index|     dest_encoded|airline_index|airline_encoded|tail_index|       tail_encoded|            features|       rawPrediction|         probability|prediction|
+-----+-----------+-------+-----------+-------------------+--------+--------+-------------+-----+----------+-----------------+-------------+---------------+----------+-------------------+--------------------+--------------------+--------------------+----------+
|    1|          1|     AA|     N001AA|                EGE|     112|   721.0|           18|    1|     287.0|(628,[287],[1.0])|          2.0| (13,[2],[1.0])|    3871.0|(4895,[3871],[1.0])|(5540,[0,1,2,3,6,...|[0.768

In [None]:
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [61]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [74]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
auc=evaluator.evaluate(pred)

In [75]:
print('AUC-ROC = {:.2f}%'.format(auc * 100))

AUC-ROC = 62.70%


In [71]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [72]:
accuracy_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = accuracy_evaluator.evaluate(pred)
print('Accuracy = {:.2f}%'.format(accuracy * 100))

Accuracy = 65.58%


Question 1: What were the top 5 origin and destination airports for the total number of flights in 2015?

In [98]:
air_manager.createOrReplaceTempView('air_manager')

In [106]:
flight_count= "SELECT AIRPORT, COUNT(*) as COUNT FROM air_manager WHERE AIRPORT IS NOT NULL GROUP BY AIRPORT ORDER BY COUNT DESC LIMIT 5"

In [107]:
top_5_origin_airports=spark.sql(flight_count)

In [108]:
top_5_origin_airports.show()

+--------------------+------+
|             AIRPORT| COUNT|
+--------------------+------+
|Hartsfield-Jackso...|346904|
|Chicago O'Hare In...|285906|
|Dallas/Fort Worth...|239582|
|Denver Internatio...|196010|
|Los Angeles Inter...|194696|
+--------------------+------+



In [212]:
top_5_origin_airports.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\top_5_origin_airports.csv", header=True, mode="overwrite")

Question 2: What were the top 5 origin airports for the number of cancelled flights in 2015?

In [112]:
cancelled_count="SELECT AIRPORT, SUM(CANCELLED) as COUNT FROM air_manager WHERE AIRPORT IS NOT NULL GROUP BY AIRPORT ORDER BY COUNT DESC LIMIT 5"

In [113]:
top_5_airports_cancel=spark.sql(cancelled_count)

In [114]:
top_5_airports_cancel.show()

+--------------------+-----+
|             AIRPORT|COUNT|
+--------------------+-----+
|Chicago O'Hare In...| 9273|
|Dallas/Fort Worth...| 6749|
|LaGuardia Airport...| 4418|
|Newark Liberty In...| 3350|
|Hartsfield-Jackso...| 2715|
+--------------------+-----+



In [213]:
top_5_airports_cancel.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\top_5_airports_cancel.csv", header=True, mode="overwrite")

Question 3: What were the top 5 origin airports for the percentage of cancelled flights in 2015? (Percentage here is with respect to the total flights departing from each respective airport.)

In [115]:
cancelled_flights_per_airport="SELECT AIRPORT, SUM(CANCELLED) as COUNT FROM air_manager WHERE AIRPORT IS NOT NULL GROUP BY AIRPORT"
cancelled_flights_per_airport=spark.sql(cancelled_flights_per_airport)
cancelled_flights_per_airport.show(2)

+--------------------+-----+
|             AIRPORT|COUNT|
+--------------------+-----+
|Melbourne Interna...|    4|
|San Diego Interna...|  679|
+--------------------+-----+
only showing top 2 rows



In [116]:
flights_per_airport="SELECT AIRPORT, COUNT(FLIGHT_NUMBER) as TOTAL_FLIGHTS FROM air_manager WHERE AIRPORT IS NOT NULL GROUP BY AIRPORT"
flights_per_airport=spark.sql(flights_per_airport)
flights_per_airport.show(2)

+--------------------+-------------+
|             AIRPORT|TOTAL_FLIGHTS|
+--------------------+-------------+
|Melbourne Interna...|         1332|
|San Diego Interna...|        70207|
+--------------------+-------------+
only showing top 2 rows



In [117]:
join_cancelled_flights=cancelled_flights_per_airport.join(flights_per_airport,on='AIRPORT', how = 'leftouter')
join_cancelled_flights.show(2)

+--------------------+-----+-------------+
|             AIRPORT|COUNT|TOTAL_FLIGHTS|
+--------------------+-----+-------------+
|Melbourne Interna...|    4|         1332|
|San Diego Interna...|  679|        70207|
+--------------------+-----+-------------+
only showing top 2 rows



In [119]:
from pyspark.sql.functions import col

join_cancelled_flights = join_cancelled_flights.withColumn("percentage", (col("COUNT") / col("TOTAL_FLIGHTS")) * 100)

In [120]:
join_cancelled_flights.show(2)

+--------------------+-----+-------------+------------------+
|             AIRPORT|COUNT|TOTAL_FLIGHTS|        percentage|
+--------------------+-----+-------------+------------------+
|Melbourne Interna...|    4|         1332|0.3003003003003003|
|San Diego Interna...|  679|        70207|0.9671400287720597|
+--------------------+-----+-------------+------------------+
only showing top 2 rows



In [121]:
join_cancelled_flights.createOrReplaceTempView('percentage')

In [217]:
percentage_cancelled="SELECT AIRPORT,ROUND(percentage,4) FROM percentage WHERE AIRPORT IS NOT NULL ORDER BY percentage DESC LIMIT 5"
percentage_cancelled=spark.sql(percentage_cancelled)
percentage_cancelled.show()

+--------------------+--------------------+
|             AIRPORT|round(percentage, 4)|
+--------------------+--------------------+
|Ithaca Tompkins R...|             11.4286|
|Mammoth Yosemite ...|             10.2564|
|Devils Lake Regio...|              7.7947|
|        Adak Airport|              7.2917|
|Muskegon County A...|              7.1964|
+--------------------+--------------------+



In [218]:
percentage_cancelled.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\percentage_cancelled.csv", header=True, mode="overwrite")

Question 4a: What is the flight number, airline, origin and destination airports and distance of the flight with the longest air time?

In [147]:
airline_df.show(2)

+-------------------+--------------------+
|DESTINATION_AIRPORT|             AIRLINE|
+-------------------+--------------------+
|                 UA|United Air Lines ...|
|                 AA|American Airlines...|
+-------------------+--------------------+
only showing top 2 rows



In [158]:
airline_df=airline_df.withColumnRenamed("DESTINATION_AIRPORT", "AIRLINE")

In [None]:
airline_df=airline_df.withColumnRenamed("AIRLINE", "AIRLINE_NAME")

In [159]:
airline_df.show(2)

+-------+--------------------+
|AIRLINE|        AIRLINE_NAME|
+-------+--------------------+
|     UA|United Air Lines ...|
|     AA|American Airlines...|
+-------+--------------------+
only showing top 2 rows



In [163]:
joined_df = air_manager.join(airline_df, on="AIRLINE")

In [165]:
joined_df.createOrReplaceTempView('air_manager_1')

In [209]:
dataset = ("SELECT * FROM air_manager_1")
dataset =spark.sql(dataset)

In [210]:
dataset.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\dataset.csv", header=True, mode="overwrite")

In [166]:
longest_flight = ("SELECT flight_number, AIRLINE_NAME, origin_airport, destination_airport, distance FROM air_manager_1 ORDER BY AIR_TIME DESC LIMIT 1")
longest_flight=spark.sql(longest_flight)
longest_flight.show()

+-------------+--------------------+--------------+-------------------+--------+
|flight_number|        AIRLINE_NAME|origin_airport|destination_airport|distance|
+-------------+--------------------+--------------+-------------------+--------+
|           51|Hawaiian Airlines...|           JFK|                HNL|  4983.0|
+-------------+--------------------+--------------+-------------------+--------+



In [200]:
longest_flight.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\longest_flight.csv", header=True, mode="overwrite")

Question 4b: What is the flight number, airline, origin and destination airports of the flight with the longest distance

In [168]:
longest_distance = ("SELECT flight_number, AIRLINE_NAME, origin_airport, destination_airport FROM air_manager_1 ORDER BY distance DESC LIMIT 1")
longest_distance=spark.sql(longest_distance)
longest_distance.show()

+-------------+--------------------+--------------+-------------------+
|flight_number|        AIRLINE_NAME|origin_airport|destination_airport|
+-------------+--------------------+--------------+-------------------+
|          420|Delta Air Lines Inc.|           JFK|                HNL|
+-------------+--------------------+--------------+-------------------+



In [199]:
longest_distance.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\longest_distance.csv", header=True, mode="overwrite")

Question 5: How many flights were there for each month listed in the Flights dataframe?

In [173]:
flights_per_month=("SELECT MONTH,COUNT(*) as COUNT FROM air_manager_1 GROUP BY MONTH")
flights_per_month=spark.sql(flights_per_month)
flights_per_month.show()

+-----+------+
|MONTH| COUNT|
+-----+------+
|    1|469968|
|    2|429191|
|    3|504312|
|    4|485151|
|    5|496993|
|    6|503897|
|    7|520718|
|    8|510536|
|    9|464946|
|   10|486165|
|   11|467972|
|   12|479230|
+-----+------+



In [219]:
flights_per_month.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\flights_per_month.csv", header=True, mode="overwrite")

Question 6: What were the top 5 airlines by number of flights in 2015?

In [177]:
top_5_airlines_num=("SELECT AIRLINE_NAME,COUNT(*) as COUNT FROM air_manager_1 GROUP BY AIRLINE_NAME LIMIT 5")
top_5_airlines_num=spark.sql(top_5_airlines_num)
top_5_airlines_num.show()

+--------------------+------+
|        AIRLINE_NAME| COUNT|
+--------------------+------+
|Skywest Airlines ...|588353|
|American Eagle Ai...|294632|
|      Virgin America| 61903|
|United Air Lines ...|515723|
|Frontier Airlines...| 90836|
+--------------------+------+



In [220]:
top_5_airlines_num.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\top_5_airlines_num.csv", header=True, mode="overwrite")

Question 7: How many flights were there for each airline each month?

In [180]:
flights_per_airline_month=("SELECT MONTH,AIRLINE_NAME,COUNT(*) as COUNT FROM air_manager_1 GROUP BY AIRLINE_NAME,MONTH ORDER BY AIRLINE_NAME,MONTH ASC")
flights_per_airline_month=spark.sql(flights_per_airline_month)
flights_per_airline_month.show()

+-----+--------------------+-----+
|MONTH|        AIRLINE_NAME|COUNT|
+-----+--------------------+-----+
|    1|Alaska Airlines Inc.|13257|
|    2|Alaska Airlines Inc.|12194|
|    3|Alaska Airlines Inc.|14276|
|    4|Alaska Airlines Inc.|13974|
|    5|Alaska Airlines Inc.|14682|
|    6|Alaska Airlines Inc.|15075|
|    7|Alaska Airlines Inc.|15821|
|    8|Alaska Airlines Inc.|16095|
|    9|Alaska Airlines Inc.|14271|
|   10|Alaska Airlines Inc.|14467|
|   11|Alaska Airlines Inc.|13950|
|   12|Alaska Airlines Inc.|14459|
|    1|American Airlines...|44059|
|    2|American Airlines...|39835|
|    3|American Airlines...|45966|
|    4|American Airlines...|44770|
|    5|American Airlines...|44710|
|    6|American Airlines...|44360|
|    7|American Airlines...|81434|
|    8|American Airlines...|79748|
+-----+--------------------+-----+
only showing top 20 rows



In [221]:
flights_per_airline_month.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\flights_per_airline_month.csv", header=True, mode="overwrite")

Question 8: How many flights were there in total for each day of the week during the year 2015?

In [187]:
flights_per_airline_day=("SELECT DAY_OF_WEEK,AIRLINE_NAME,COUNT(*) as COUNT FROM air_manager_1 GROUP BY AIRLINE_NAME,DAY_OF_WEEK ORDER BY AIRLINE_NAME,DAY_OF_WEEK ASC")
flights_per_airline_day=spark.sql(flights_per_airline_day)
flights_per_airline_day.show()

+-----------+--------------------+------+
|DAY_OF_WEEK|        AIRLINE_NAME| COUNT|
+-----------+--------------------+------+
|          1|Alaska Airlines Inc.| 25082|
|          2|Alaska Airlines Inc.| 24165|
|          3|Alaska Airlines Inc.| 24395|
|          4|Alaska Airlines Inc.| 25477|
|          5|Alaska Airlines Inc.| 25135|
|          6|Alaska Airlines Inc.| 23574|
|          7|Alaska Airlines Inc.| 24693|
|          1|American Airlines...|106225|
|          2|American Airlines...|103401|
|          3|American Airlines...|106503|
|          4|American Airlines...|108168|
|          5|American Airlines...|106374|
|          6|American Airlines...| 92264|
|          7|American Airlines...|103049|
|          1|American Eagle Ai...| 43872|
|          2|American Eagle Ai...| 42970|
|          3|American Eagle Ai...| 43450|
|          4|American Eagle Ai...| 44314|
|          5|American Eagle Ai...| 43869|
|          6|American Eagle Ai...| 34246|
+-----------+--------------------+

In [222]:
flights_per_airline_day.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\flights_per_airline_day.csv", header=True, mode="overwrite")

Question 9: How many flights were there for each day of the week during each month of 2015?

In [185]:
flights_per_dayofweek_month=("SELECT MONTH,DAY_OF_WEEK,AIRLINE_NAME,COUNT(*) as COUNT FROM air_manager_1 GROUP BY AIRLINE_NAME,DAY_OF_WEEK,MONTH ORDER BY AIRLINE_NAME,MONTH,DAY_OF_WEEK ASC")
flights_per_dayofweek_month=spark.sql(flights_per_dayofweek_month)
flights_per_dayofweek_month.show()

+-----+-----------+--------------------+-----+
|MONTH|DAY_OF_WEEK|        AIRLINE_NAME|COUNT|
+-----+-----------+--------------------+-----+
|    1|          1|Alaska Airlines Inc.| 1711|
|    1|          2|Alaska Airlines Inc.| 1636|
|    1|          3|Alaska Airlines Inc.| 1655|
|    1|          4|Alaska Airlines Inc.| 2192|
|    1|          5|Alaska Airlines Inc.| 2245|
|    1|          6|Alaska Airlines Inc.| 2078|
|    1|          7|Alaska Airlines Inc.| 1740|
|    2|          1|Alaska Airlines Inc.| 1780|
|    2|          2|Alaska Airlines Inc.| 1675|
|    2|          3|Alaska Airlines Inc.| 1701|
|    2|          4|Alaska Airlines Inc.| 1806|
|    2|          5|Alaska Airlines Inc.| 1814|
|    2|          6|Alaska Airlines Inc.| 1662|
|    2|          7|Alaska Airlines Inc.| 1756|
|    3|          1|Alaska Airlines Inc.| 2348|
|    3|          2|Alaska Airlines Inc.| 2246|
|    3|          3|Alaska Airlines Inc.| 1812|
|    3|          4|Alaska Airlines Inc.| 1898|
|    3|      

In [223]:
flights_per_dayofweek_month.write.csv(r"C:\Users\16478\Desktop\dataset\tableau\flights_per_dayofweek_month.csv", header=True, mode="overwrite")

<class 'pyspark.sql.dataframe.DataFrame'>
