#### Reading data from FLIGHTDELAY TABLE AND CREATING SPARK DATAFRAME

In [0]:
df = sqlContext.table("flightdelay")

In [0]:
df.count()

In [0]:
df.printSchema()

### NUMBER OF FLIGHTS DELAYED DUE TO WEATHER EACH YEAR FROM 2012 to 2019

In [0]:
%sql

SELECT YEAR, COUNT(weatherDelay) AS NUMBER_OF_FLIGHTS
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY YEAR
ORDER BY YEAR;

YEAR,NUMBER_OF_FLIGHTS
2012,52525
2013,67281
2014,71556
2015,62096
2016,50143
2017,49408
2018,85055
2019,82237


In [0]:
# Monthly Flights

In [0]:
%sql

SELECT Month, COUNT(weatherDelay) AS NUMBER_OF_FLIGHTS
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY Month
ORDER BY Month;

Month,NUMBER_OF_FLIGHTS
1,52832
2,41676
3,34042
4,37427
5,46399
6,65288
7,61765
8,52290
9,28522
10,22105


In [0]:
# Weekly Flights 

In [0]:
%sql

SELECT DayOfWeek, COUNT(weatherDelay) AS NUMBER_OF_FLIGHTS
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY DayOfWeek
ORDER BY DayOfWeek;

DayOfWeek,NUMBER_OF_FLIGHTS
1,82465
2,72370
3,74372
4,83080
5,77387
6,58330
7,72297


### AVERAGE DELAY TIME DUE TO WEATHER EACH YEAR FROM 2012 to 2019

In [0]:
%sql

SELECT YEAR, AVG(weatherDelay) AS AVERAGE_DELAY_TIME_In_MINUTES
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY YEAR
ORDER BY YEAR;

YEAR,AVERAGE_DELAY_TIME_In_MINUTES
2012,43.40428367444074
2013,42.139757137973575
2014,43.93916652691598
2015,48.158883019840246
2016,51.97301717089125
2017,56.66610670336787
2018,57.83404855681618
2019,64.23508882862944


### AVERAGE DELAY TIME DUE TO WEATHER EACH MONTH ACROSS ALL YEARS FROM 2012 to 2019

In [0]:
%sql

SELECT MONTH, AVG(weatherDelay) AS AVERAGE_DELAY_TIME_In_MINUTES
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY MONTH
ORDER BY MONTH;

MONTH,AVERAGE_DELAY_TIME_In_MINUTES
1,52.07580632949728
2,52.142000191957
3,49.74361083367605
4,52.94845966815401
5,54.65219077997371
6,50.565555691704446
7,53.00917995628592
8,50.59607955632052
9,50.44001121940958
10,49.86532458719746


### AVERAGE DELAY TIME DUE TO WEATHER EACH DAY OF WEEK

In [0]:
%sql

SELECT DAYOFWEEK, AVG(weatherDelay) AS AVERAGE_DELAY_TIME_In_MINUTES
FROM flightdelay
WHERE weatherDelay is not null
AND weatherDelay > 0
AND TAIL_NUMBER is not null
GROUP BY DAYOFWEEK
ORDER BY DAYOFWEEK;

DAYOFWEEK,AVERAGE_DELAY_TIME_In_MINUTES
1,50.44120535984963
2,51.612795357192205
3,52.489525627924486
4,51.5269017814155
5,52.13652163800121
6,52.750985770615465
7,50.70619804417888


#### AVERAGE NUMBER OF FLIGHT DELAYS IN DEPARTURE FOR EACH STATE FOR THE TIME PERIOD 2012 - 2019

In [0]:
%sql

SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_NUM_FLIGHTS_DELAYED 
FROM (SELECT YEAR, ORIGINSTATE, COUNT(DepDelay) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE DepDelay > 0
GROUP BY YEAR, ORIGINSTATE) AS T1
GROUP BY ORIGINSTATE
ORDER BY ORIGINSTATE

ORIGINSTATE,AVG_NUM_FLIGHTS_DELAYED
AK,8697.0
AL,9431.875
AR,7655.125
AZ,70881.75
CA,290930.0
CO,106844.875
CT,7228.375
DE,152.66666666666666
FL,171462.875
GA,143709.75


In [0]:
import plotly.express as px
test_df = sqlContext.sql("SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_NUM_FLIGHTS_DELAYED FROM (SELECT YEAR, ORIGINSTATE, COUNT(DepDelay) AS NUM_FLIGHTS_DELAYED FROM FLIGHTDELAY WHERE DepDelay > 0 GROUP BY YEAR, ORIGINSTATE) AS T1 GROUP BY ORIGINSTATE");
fig = px.choropleth(test_df.toPandas(),locations='ORIGINSTATE',locationmode="USA-states",scope="usa",color='AVG_NUM_FLIGHTS_DELAYED',color_continuous_scale="Viridis_r")
fig.show()

#### AVERAGE NUMBER OF FLIGHT DELAYS IN DEPARTURE DUE TO WEATHER FOR EACH STATE FOR THE TIME PERIOD 2012 - 2019

In [0]:
%sql

SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_NUM_FLIGHTS_DELAYED 
FROM (SELECT YEAR, ORIGINSTATE, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGINSTATE) AS T1
GROUP BY ORIGINSTATE
ORDER BY ORIGINSTATE

ORIGINSTATE,AVG_NUM_FLIGHTS_DELAYED
AK,260.0
AL,286.125
AR,328.125
AZ,559.125
CA,2809.375
CO,2912.25
CT,192.25
DE,6.0
FL,6363.375
GA,5514.25


In [0]:
import plotly.express as px
test_df = sqlContext.sql("SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_NUM_FLIGHTS_DELAYED FROM (SELECT YEAR, ORIGINSTATE, COUNT(*) AS NUM_FLIGHTS_DELAYED FROM FLIGHTDELAY WHERE weatherDelay > 0 GROUP BY YEAR, ORIGINSTATE) AS T1 GROUP BY ORIGINSTATE");
fig = px.choropleth(test_df.toPandas(),
                    locations='ORIGINSTATE',
                    locationmode="USA-states",
                    scope="usa",
                    color='AVG_NUM_FLIGHTS_DELAYED',
                    color_continuous_scale=[
                        (0.0, "#e5e5e5"),   (0.001449275362, "#e5e5e5"), 
                        (0.01449275362, "#ffe5f0"),   (0.1086956522, "#ffe5f0"), 
                        (0.1086956522, "#facfdf"), (0.1449275362, "#facfdf"), 
                        (0.1449275362, "#f3b8ce"),  (0.3623188406, "#f3b8ce"), 
                        (0.3623188406, "#eca2bf"), (0.7246376812, "#eca2bf"), 
                        (0.7246376812, "#e37fb1"), (1, "#e37fb1") 
                    ])
fig.show()

In [0]:
%sql

SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_NUM_FLIGHTS_DELAYED 
FROM (SELECT YEAR, ORIGINSTATE, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGINSTATE) AS T1
GROUP BY ORIGINSTATE
ORDER BY AVG_NUM_FLIGHTS_DELAYED DESC
LIMIT 10

ORIGINSTATE,AVG_NUM_FLIGHTS_DELAYED
TX,10242.5
IL,9395.375
FL,6363.375
GA,5514.25
NY,3436.375
CO,2912.25
CA,2809.375
NC,2544.5
VA,2072.125
MI,1652.5


In [0]:
%sql

SELECT IATA_CODE_Reporting_Airline, AVG(NUM_FLIGHTS_DELAYED) AS AVG_Number_Delayed 
FROM (SELECT YEAR, IATA_CODE_Reporting_Airline, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, IATA_CODE_Reporting_Airline) AS T1
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY AVG_Number_Delayed DESC
LIMIT 10

IATA_CODE_Reporting_Airline,AVG_Number_Delayed
MQ,10675.5
AA,10669.375
DL,9621.625
WN,8541.875
UA,6729.75
OO,5797.25
OH,5310.0
YX,3421.0
EV,3218.25
9E,2720.333333333333


In [0]:
%sql

SELECT IATA_CODE_Reporting_Airline, AVG(ArrDelayMinutes)/8 AS AVG_Delayed 
FROM (SELECT YEAR, ArrDelayMinutes, IATA_CODE_Reporting_Airline, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ArrDelayMinutes, IATA_CODE_Reporting_Airline) AS T1
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY AVG_Delayed DESC
LIMIT 10

IATA_CODE_Reporting_Airline,AVG_Delayed
OO,45.77235186156266
DL,42.56454093705632
9E,38.79352226720648
UA,36.124892826521865
AA,35.899268050028425
YX,34.27450980392157
MQ,34.07468114786768
YV,31.22275121555916
EV,28.55715192168238
OH,26.73546511627907


In [0]:
%sql

SELECT IATA_CODE_Reporting_Airline, AVG(ArrDelayMinutes)/8 AS AVG_Delayed 
FROM (SELECT YEAR, ArrDelayMinutes, IATA_CODE_Reporting_Airline, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
AND IATA_CODE_Reporting_Airline not in ("FL","HA")
GROUP BY YEAR, ArrDelayMinutes, IATA_CODE_Reporting_Airline) AS T1
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY AVG_Delayed ASC
LIMIT 10

IATA_CODE_Reporting_Airline,AVG_Delayed
VX,16.726471727343146
F9,17.309152542372882
AS,17.356530548628427
US,21.02975663716814
NK,22.25915141430949
G4,24.463809946714036
WN,25.51110154905336
B6,25.619305657604706
OH,26.73546511627907
EV,28.55715192168238


In [0]:
%sql

SELECT IATA_CODE_Reporting_Airline, AVG(NUM_FLIGHTS_DELAYED) AS AVG_Number_Delayed 
FROM (SELECT YEAR, IATA_CODE_Reporting_Airline, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, IATA_CODE_Reporting_Airline) AS T1
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY AVG_Number_Delayed ASC
LIMIT 10

IATA_CODE_Reporting_Airline,AVG_Number_Delayed
HA,417.625
FL,514.0
F9,586.875
AS,912.375
NK,1149.0
VX,1763.0
G4,1896.0
B6,1981.25
YV,2253.5
US,2595.25


In [0]:
%sql

SELECT ORIGINSTATE, AVG(NUM_FLIGHTS_DELAYED) AS AVG_Number_Delayed 
FROM (SELECT YEAR, ORIGINSTATE, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGINSTATE) AS T1
GROUP BY ORIGINSTATE
ORDER BY AVG_Number_Delayed DESC
LIMIT 10

ORIGINSTATE,AVG_Number_Delayed
TX,10242.5
IL,9395.375
FL,6363.375
GA,5514.25
NY,3436.375
CO,2912.25
CA,2809.375
NC,2544.5
VA,2072.125
MI,1652.5


In [0]:
%sql

SELECT ORIGIN, AVG(NUM_FLIGHTS_DELAYED) AS AVG_Number_Delayed 
FROM (SELECT YEAR, ORIGIN, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGIN) AS T1
GROUP BY ORIGIN
ORDER BY AVG_Number_Delayed DESC
LIMIT 10

ORIGIN,AVG_Number_Delayed
ORD,8172.0
DFW,5867.375
ATL,5342.75
DEN,2558.25
MCO,2033.75
IAH,2024.375
CLT,1966.625
MIA,1468.75
JFK,1445.125
LGA,1439.5


In [0]:
%sql

SELECT ORIGIN, AVG(NUM_FLIGHTS_DELAYED) AS AVG_Number_Delayed 
FROM (SELECT YEAR, ORIGIN, COUNT(*) AS NUM_FLIGHTS_DELAYED 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGIN) AS T1
GROUP BY ORIGIN
ORDER BY AVG_Number_Delayed asc
LIMIT 10

ORIGIN,AVG_Number_Delayed
LBF,1.0
PSE,1.0
GST,1.0
SPN,1.0
HYA,1.0
DLG,1.3333333333333333
IPL,1.5
RDD,1.75
MMH,2.0
ADK,2.0


In [0]:
%sql
SELECT ORIGIN, AVG(ArrDelay)/360 AS AVG_Arr_Delay 
FROM (SELECT YEAR, ORIGIN, COUNT(*) AS ArrDelay
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, ORIGIN) AS T1
GROUP BY ORIGIN
ORDER BY AVG_Arr_Delay DESC
LIMIT 10

ORIGIN,AVG_Arr_Delay
ORD,22.7
DFW,16.29826388888889
ATL,14.840972222222222
DEN,7.10625
MCO,5.649305555555555
IAH,5.623263888888889
CLT,5.462847222222222
MIA,4.079861111111111
JFK,4.014236111111111
LGA,3.998611111111111


In [0]:
%sql

SELECT DEST, AVG(ArrDelay)/360 AS AVG_Arr_Delay 
FROM (SELECT YEAR, DEST, COUNT(*) AS ArrDelay
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY YEAR, DEST) AS T1
GROUP BY DEST
ORDER BY AVG_Arr_Delay DESC
LIMIT 10

DEST,AVG_Arr_Delay
DFW,9.942361111111111
ORD,9.555555555555555
ATL,9.094097222222222
LAX,6.007986111111111
DEN,5.626736111111111
SFO,5.585416666666666
IAH,4.976041666666666
LGA,4.763888888888889
CLT,4.025
EWR,3.9590277777777776


In [0]:
%sql

SELECT DEST, ORIGIN, COUNT(*) AS TOTAL_COUNT 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY DEST, ORIGIN 
ORDER BY TOTAL_COUNT DESC
LIMIT 10




DEST,ORIGIN,TOTAL_COUNT
LGA,ORD,2534
DFW,ORD,1979
ATL,ORD,1864
LAX,ORD,1859
MSP,ORD,1759
ORD,LGA,1702
LAX,JFK,1653
SFO,ORD,1611
DCA,ORD,1419
DTW,ORD,1396


In [0]:
%sql

SELECT DEST, ORIGIN, COUNT(*) AS TOTAL_COUNT 
FROM FLIGHTDELAY 
WHERE weatherDelay > 0
GROUP BY DEST, ORIGIN 
ORDER BY TOTAL_COUNT DESC
LIMIT 10

DEST,ORIGIN,TOTAL_COUNT
LGA,ORD,2534
DFW,ORD,1979
ATL,ORD,1864
LAX,ORD,1859
MSP,ORD,1759
ORD,LGA,1702
LAX,JFK,1653
SFO,ORD,1611
DCA,ORD,1419
DTW,ORD,1396


#### Seeing Arrival Delay of the flights which had delay in departure due to weather

In [0]:
%sql 

SELECT  WEATHERDELAY, AVG(ArrDelay) AS AVG_ARR_DELAY
FROM FLIGHTDELAY
WHERE WEATHERDELAY > 0
AND ArrDelay > 0
GROUP BY WEATHERDELAY

WEATHERDELAY,AVG_ARR_DELAY
305.0,338.37704918032784
596.0,658.4444444444445
692.0,708.1111111111111
934.0,934.0
299.0,339.1774193548387
496.0,536.3333333333334
147.0,176.53863134657837
184.0,210.1034482758621
170.0,206.622641509434
810.0,813.7777777777778


In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

Selecting columns which can be helpful in predicting the arrival delay time of the flights which had delay in departure due to weather

In [0]:
from pyspark.sql.functions import corr
for col in df.columns:
    print(col, df.select(corr(col, 'ArrDelayMinutes')).collect())

In [0]:
raw_data = df.na.drop(subset=["Tail_Number"]) \
              .filter("WeatherDelay>0") \
              .select(
                'Month', 
                'DayOfWeek',
                'Origin', 
                'Dest', 
                'DepDelay',
                'WeatherDelay',
                'ArrDelayMinutes',
                'AirTime',
                'Distance'
            )

In [0]:
string_columns = ['Origin', 'Dest']
indexed_columns = ['Origin_cat', 'Dest_cat']

In [0]:
indexer = StringIndexer(inputCols=string_columns, outputCols=indexed_columns)
indexed_data = indexer.fit(raw_data).transform(raw_data)

In [0]:
indexed_data.columns

In [0]:
assembler = VectorAssembler(
    inputCols=['Month', 'DayOfWeek', 'WeatherDelay', 'DepDelay', 'Origin_cat', 'Dest_cat', 'AirTime', 'Distance'],
    outputCol='features'
)

In [0]:
output_data = assembler.transform(indexed_data)

In [0]:
final_data = output_data.select(['features', 'ArrDelayMinutes'])
final_data.show()

In [0]:
train_data, test_data = final_data.randomSplit([0.8, 0.2])

In [0]:
lr = LinearRegression(labelCol='ArrDelayMinutes', maxIter=1000, tol=1e-05, regParam=0.3, elasticNetParam=0.8)
weatherDelayModel = lr.fit(train_data)

In [0]:
print("Coefficients: {} Intercept: {}".format(weatherDelayModel.coefficients,weatherDelayModel.intercept))

In [0]:
weatherDelayModel_results = weatherDelayModel.evaluate(test_data)
print(f'RSquared Error {weatherDelayModel_results.r2}\nAccuracy {weatherDelayModel_results.r2:.2%}')