%md
# Final Project - Flight Delays

#### Team members:
- Carla Cortez
- Redwan Hussain
- Anqi Liu
- Murray Stokely


### 1. Abstract
In the aviation industry, airline companies have taken interest in predicting flight delays because of their financial impact and to retain customer satisfaction. Our team's goal is to analyze historical flight and weather data from 2015-2021 and build a classification model to predict delays within a 2-hour window. The objective of phase 3 of this project is to further develop models by creating new features and with hyperparameter tuning. We have chosen precision and recall for our evaluation parameters as mitigating False Negatives will lead to less customers missing their flight. 

This phase consisted of: revising the data sampling step; identifying features that are time-based and graph-based; adjusting the existing logistic regression model and building one with random forest; and finally, performing grid search to find the most optimal hyperparameters. We performed downsampling due to the imbalance between flights with and without delays in the training data. During feature engineering, we created a correlation matrix to understand the relationship between various variables and delay (DEP_DEL15). Additionally, we utilized Page Rank to find flight routes with the most traffic. 

All models were trained with cross validation and tested against the held-out set (data from 2021). The logistic regression model resulted in correctly predicting delays (true positive) 39% out of all flights and missing delays (false negative) 7% as well. Precision and recall for this model are 0.504 and 0.854, respectively, while the F1 and F2 scores are 0.634 and 0.750, respectively. By comparison, the random forest model has a true negative rate of 45% and false negative rate of 54%. There were issues experienced with this model, which led to no delays being predicted, and therefore, inconclusive results. This will be corrected and finalized in the next project phase.

### 2. Data description

This project relies on three main datasets.

1. **Flights data** This is a subset of the passenger flight's on-time performance data taken from the TranStats data collection available from the U.S. Department of Transportation (DOT).  There are approximately 100 features in this data set.

2. **Weather** This provides weather information from NOAA for the same time period 2015-2021 as the flights data.

3. **Airports** This provides more detailed information about the airports in the flights dataset.

#### 2.1 EDA - Pre Join

In [None]:
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
import seaborn as sns  
sns.set(style="darkgrid")  
from pyspark.sql.functions import col,isnan,when,count
import statsmodels.api as sm
from tabulate import tabulate
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_utc_timestamp
data_BASE_DIR = "dbfs:/mnt/mids-w261/datasets_final_project_2022/"



In [None]:
blob_container = "w261container" # The name of your container created in https://portal.azure.com
storage_account = "mstokely" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261t21scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "saskey" # The name of the secret key created in your local computer using the Databricks CLI
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

##### 2.1.1 Full Flight (Main) Data

In [None]:
df_airlines_full = spark.read.parquet(f"{data_BASE_DIR}parquet_airlines_data/")
df_airlines_full.createOrReplaceTempView("df_airlines_full_tb")

__Flight Data - Response Variable Check__


Please refer to the ___Flight delayed pct by year___ and ___Flight delayed cnt___ tab below for our response variable check.


___Flight delayed pct by year___
<br>`The plot shows the delayed percentage out of all non-cancelled flights over month by year`
<br>All years except for the 2020 are showing similar sesonality trends.


___Flight delayed cnt___
<br>`The plot shows the total delayed flights by year - month`
<br>The number of flights dropped significantly starting from 2020 and is slowing recovring.

In [None]:
%sql

select
year, month,
concat(cast(year as string), case when month >= 10 then cast(month as string) else concat('0',cast(month as string)) end) as year_month,
sum(case when dep_delay>15 then 1 else 0 end) as delay_cnt,
sum(case when dep_delay>15 then 1 else 0 end)/count(dep_delay) as delay_pct
from df_airlines_full_tb
group by year, month, year_month
order by year, month;

year,month,year_month,delay_cnt,delay_pct
2015,1,201501,175414,0.1913700522134533
2015,2,201502,176200,0.2153339264589423
2015,3,201503,183702,0.1860563571432912
2015,4,201504,153274,0.1593622764078869
2015,5,201505,173236,0.176175667182609
2015,6,201506,226180,0.2283728359709934
2015,7,201507,212222,0.2055574174126471
2015,8,201508,185980,0.1838411915771909
2015,9,201509,112126,0.121081685809191
2015,10,201510,115470,0.1193300897430067


Output can only be rendered in Databricks

Output can only be rendered in Databricks

##### 2.1.2 Data To Be Excluded

Flight cancelled are not included in the "delay" analysis.

| Exclusion          | Number of Records|
|--------------------|------------------|
|Cancelled flights   |     874,041      |
|Duplicated flights  |     31,746,841   |

##### 2.1.3 Variables to be investigated

##### 2.1.3.1 Stations

With Airport Codes to Connect with All Other Tables

In [None]:
df_airport_codes = spark.read.option("header",True).csv(f"{blob_url}/airports_tz.csv")
select_columns = ['IATA', 'ICAO', 'tz_time_zone']
df_iata_icao_codes = df_airport_codes.select(select_columns).filter(col("ICAO").isNotNull() & col("IATA").isNotNull())

df_stations = spark.read.parquet(f"{data_BASE_DIR}stations_data/*")
df_unique_neighbor_stations = df_stations.filter(col('neighbor_id') == col('station_id'))
df_neighbor_stations = df_unique_neighbor_stations.join(df_iata_icao_codes, df_unique_neighbor_stations.neighbor_call ==  df_iata_icao_codes.ICAO, "inner")
select_station_cols = [
  'station_id',
  'IATA',
  'tz_time_zone'
]
df_neighbor_stations = df_neighbor_stations.select(select_station_cols)
df_neighbor_stations.createOrReplaceTempView("df_neighbor_stations_temp")

##### 2.1.3.2 Flight

In [None]:
df_airlines = spark.read.parquet(f"{data_BASE_DIR}parquet_airlines_data/")

#finding and removing duplicates
df_airlines_distinct = df_airlines.distinct()
# print("Total: ", str(df_airlines.count()), " Distinct: ", str(df_airlines_distinct.count()), " Removed duplicates: "+str(df_airlines.count() - df_airlines_distinct.count()))

##Filter cancelled flights 
df_airlines_clean =  df_airlines_distinct.filter(col('CANCELLED') == 0)

#Create new df_flights dataframe with only the required columns
select_airline_cols = ['QUARTER',
 'MONTH',
 'DAY_OF_MONTH',
 'DAY_OF_WEEK',
 'FL_DATE',
 'OP_UNIQUE_CARRIER',
 'TAIL_NUM',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'ORIGIN_CITY_NAME',
 'ORIGIN_STATE_ABR',
 'ORIGIN_WAC',
 'DEST',
 'DEST_CITY_NAME',
 'DEST_STATE_ABR', 
 'DEST_WAC',
 'CRS_DEP_TIME',
 'DEP_DEL15',
 'DEP_DELAY_GROUP',
 'DEP_TIME_BLK',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'CRS_ELAPSED_TIME',
 'DISTANCE',
 'DISTANCE_GROUP',
 'YEAR']
df_airlines_clean =  df_airlines_clean.select(select_airline_cols)
df_airlines_clean.createOrReplaceTempView("df_airlines_clean_temp")

__2.1.3.2.1 Flight - Departure and Arrival Time__

In [None]:
%sql

select
 CRS_DEP_TIME
from
  df_airlines_clean_temp

CRS_DEP_TIME
620
620
2155
939
800
1435
1406
959
959
1153


Output can only be rendered in Databricks

In [None]:
%sql

select
 CRS_ARR_TIME
from
  df_airlines_clean_temp

CRS_ARR_TIME
710
1215
1910
1730
1335
2156
1950
2311
1335
1945


Output can only be rendered in Databricks

In [None]:
%sql

select
 CRS_DEP_TIME,
 CRS_ARR_TIME,
 CRS_ARR_TIME - CRS_DEP_TIME,
 CRS_ELAPSED_TIME
from
  df_airlines_clean_temp
where CRS_ARR_TIME < CRS_DEP_TIME

CRS_DEP_TIME,CRS_ARR_TIME,(CRS_ARR_TIME - CRS_DEP_TIME),CRS_ELAPSED_TIME
2250,15,-2235,85.0
841,835,-6,54.0
1610,14,-1596,304.0
2220,604,-1616,284.0
2220,47,-2173,147.0
2035,25,-2010,170.0
2315,13,-2302,58.0
2350,535,-1815,225.0
2155,220,-1935,205.0
1635,55,-1580,320.0


##### 2.1.3.3 Weather

We have 30,528,602 records in our weather data set representing weather readings from a specific station (at a specific location) at a specific time.

In [None]:
df_weather_full = spark.read.parquet(f"{data_BASE_DIR}parquet_weather_data/")
df_weather_full.createOrReplaceTempView("df_weather_full_tb")
df_weather_distinct = spark.sql(
  """
  select distinct 
  STATION,
  DATE,
  LATITUDE,
  LONGITUDE,
  ELEVATION,
  REPORT_TYPE,
 HourlyAltimeterSetting,
 HourlyDewPointTemperature,
 HourlyDryBulbTemperature,
 HourlyRelativeHumidity,
 HourlySkyConditions,
 HourlyStationPressure,
 HourlyVisibility,
 HourlyWetBulbTemperature,
 HourlyWindDirection,
 HourlyWindSpeed
  from df_weather_full_tb
  """)

df_weather_distinct.createOrReplaceTempView("df_weather_distinct")

__2.1.3.3.1 Weather - Sky Condition__


The code "CLR:00" indicates clear skies, and 4014537 of our 30528602 data points have this value (13%).  Propose creating a column "is_clear" with those.   4992829 of our data points have "OVC" for overcast in the description (16.4%).  Propose creating a "is_overcast" column with those.

___METAR Abbreviations___

OVC - Overcast
FEW - Few clouds

No "SN" or "TS" or "FZ" severe weather events in the dataset.

In [None]:
# df_weather_distinct.createOrReplaceTempView("df_weather_distinct_temp")
df_weather_distinct.select("HourlySkyConditions").filter(col("HourlySkyConditions").like("%OVC%")).count()

Out[58]: 110468075

In [None]:
%sql

select
 HourlySkyConditions, COUNT(*) as sum
from
  df_weather_distinct
WHERE HourlySkyConditions == "%FZ%"
GROUP BY HourlySkyConditions
ORDER BY sum DESC


HourlySkyConditions,sum


__2.1.3.3.2 Weather - Report Type__

We have several reports for 1 station at 1 time based on the check below; therefore, we're going to take the most special events first and fill the missings with the average from all the other reports to capture the most significant weather events.

In [None]:
%sql

with report_check as (
select 
station, date, 
count(report_type) as report_cnt
from df_weather_distinct
group by station, date)
select max(report_cnt) from report_check


max(report_cnt)
4


#### 2.2 Table description
a) df_flights: It contains a time series of the flight schedules from the year 2015 to 2021, including:
- Time period information:
  - Flight date and time
  - Day of the week/day of the month and year
- Flight operational data:
  - Scheduled departure/arrival time
  - Departure/arrival time
  - Departure/arrival delay measured in minutes, it is calculated as the difference between the scheduled and actual arrival/departure time.
  - Flight cancellation, in our case we will ignore the the canceled flights
- Metadata associated with the flight's origin and destination airports:
  - IATA airport code which is the airport location's unique 3 letter identifier
  - Airport state and city
- Carrier information
 
b) df_neighbor_stations: It provides metadata about weather stations with neighbor airports. It includes:
- Station unique identifier
- neighbor_call corresponds to the ICAO airport code which is defined by the International Civil Aviation Organization
- Neighbor airport name, state

c) df_weather_by_station: It contains a time series of weather information per weather station, including:
- Station unique identifier
- Time period information 
- Station metadata
- Weather metrics
- Weather date lag timestamp

d) df_iata_icao_codes: External resource that contains the mapping between the IATA and ICAO unique airport codes and the airport timezone (source: https://openflights.org/data.html)

#### 2.3 Table joins

a) df_neighbor_stations <> df_iata_icao_codes
- We will perform an inner join between the df_neighbor_stations and df_iata_icao_codes tables, using the df_neighbor_stations.neighbor_call and the df_iata_icao_codes.icao_code columns to enhance the df_neighbor_stations table with a new corresponding IATA airport code.

b) df_flights <> df_iata_icao_codes
- We will perform a left join between the df_flights and df_iata_icao_codes tables to obtain the timezone per airport by iata code. This will be used later to transform the timestamps to UTC.

b) df_neighbor_stations <> df_flights
- Both tables can be joined using the df_flights.ORIGIN and the df_flights.DEST columns, which contain the IATA airport code for the origin and destination airports, respectively,  with the 
df_neighbor_stations.iata_code that was created above. We will need to join the tables twice, once using the df_flights.ORIGIN and df_neighbor_stations.iata_code columns and the second join using df_flights.DEST and df_neighbor_stations.iata_code columns. We can perform inner joins between these two tables because we are only interested in flights with associated weather information.
- This relationship is many to many because multiple flights can be mapped to multiple neighbor stations.
- With the result of these joins, we will create a new table called df_flight_station that contains the origin/destination flight and airport information along with the associated weather station_id. This table can be stored in the Azure Blob storage.

c) df_flights_station <> df_weather_station
- We will join the df_flights_station and the df_weather_by_station tables using the df_flights_station.station_id and df_weather_by_station.station columns. To obtain the latest weather measurement two hours before the estimated flight departure, we created two columns that define the acceptable window of time (from 6 hours before the flight to 2 hours before the flight) that we should consider when selecting the latest weather measurement.

d) The final df_flights_weather_station contains the flight/weather data for the origin and destination airports. This table includes a column that contains the previous timestamp available for the weather station measurement timestamp (lag). This will be used to obtain the origin and destination lag weather measurements by merging df_flights_weather_station.ws_origin/dest_weather_lag matching the weather.DATE and station_id columns.

<img src="https://user-images.githubusercontent.com/88794396/204161781-0651e1c4-3a41-49ca-b7c0-903a295fe1d5.png?raw=true>" width=65%>

##### 2.3.1 Table Join Step 1 - Flight with Airpot Code

In [None]:
# df_airlines_clean.createOrReplaceTempView("df_airlines_clean_temp")
df_iata_icao_codes.createOrReplaceTempView("df_iata_icao_codes_temp")

df_flights_with_timezone = spark.sql(
"""
select
df_airlines_clean_temp.*,
to_timestamp(CONCAT(FL_DATE, ' ', SUBSTR(CRS_DEP_TIME, 0, LEN(CRS_DEP_TIME) - 2), ':', SUBSTR(CRS_DEP_TIME, - 2))) as CRS_DEP_TIMESTAMP,
icode1.tz_time_zone as tz_time_zone_origin,
icode2.tz_time_zone as tz_time_zone_dest
from
df_airlines_clean_temp
left join df_iata_icao_codes_temp icode1 on df_airlines_clean_temp.ORIGIN = icode1.IATA
left join df_iata_icao_codes_temp icode2 on df_airlines_clean_temp.DEST = icode2.IATA
where icode1.tz_time_zone is not null  and icode1.tz_time_zone not like '%\\N%'
""")

##### 2.3.2 Table Join Step 2 - Flight Time Adjustments
1. Convert to UTC
2. Create Time Interval

In [None]:
#Transform CRS departure timestamp to UTC
df_flights_with_timezone = df_flights_with_timezone.withColumn("CRS_DEP_TIMESTAMP", to_utc_timestamp("CRS_DEP_TIMESTAMP", col("tz_time_zone_origin")))
# df_flights_with_timezone.display()


In [None]:
#Transform CRS flight departure time to timestamp and create flight departure and arrival timestamps 2 hours and 6 hours before the scheduled departure time
df_flights_with_timezone.createOrReplaceTempView("df_flights_with_timezone_temp")
df_flights = spark.sql(
"""
select
 CRS_DEP_TIMESTAMP - INTERVAL 6 HOURS as CRS_DEP_TIMESTAMP_START,
 CRS_DEP_TIMESTAMP - INTERVAL 2 HOURS as CRS_DEP_TIMESTAMP_END,
 df_flights_with_timezone_temp.*
from
 df_flights_with_timezone_temp
""")
#df_flights.select([count(when( col(c).isNull(), c)).alias(c) for c in df_flights.columns]).display()


# Join neighbor_stations and flights data 
df_neighbor_stations.createOrReplaceTempView("df_neighbor_stations_temp")
df_flights.createOrReplaceTempView("df_flights_temp")

df_flights_station = spark.sql(
"""
select
ns1.station_id as station_id_origin,
ns2.station_id as station_id_dest,
df_flights_temp.*
from
df_flights_temp
left join df_neighbor_stations_temp ns1 on df_flights_temp.ORIGIN = ns1.IATA
left join df_neighbor_stations_temp ns2 on df_flights_temp.DEST = ns2.IATA
""")
# df_flights_station.display()

df_flights_station.createOrReplaceTempView("df_flights_station_temp")

##### 2.3.3 Table Join Step 3 - Weather Data Adjustments
1. Convert to UTC
2. Report Type Dedup

In [None]:
df_weather_adj_tz = spark.sql("""
select t1.station, 
case when (t2.tz_time_zone not like '%\\N%' and t2.tz_time_zone is not null)
  then coalesce(to_utc_timestamp(cast(t1.date as timestamp), t2.tz_time_zone), cast(t1.date as timestamp))
  else cast(t1.date as timestamp) end as date,
 t1.report_type,
 t1.latitude,
 t1.longitude,
 t1.elevation,
 t1.HourlyAltimeterSetting,
t1.HourlyDewPointTemperature,
t1.HourlyDryBulbTemperature,
t1.HourlyRelativeHumidity,
t1.HourlySkyConditions,
t1.HourlyStationPressure,
t1.HourlyVisibility,
t1.HourlyWetBulbTemperature,
t1.HourlyWindDirection,
t1.HourlyWindSpeed
from df_weather_distinct as t1
left join df_neighbor_stations_temp as t2
on cast(t1.station as float) = cast(t2.station_id as float)
""")

df_weather_adj_tz.createOrReplaceTempView("df_weather_adj_tz")

In [None]:
report_type_rank = spark.sql("""
select 'CRN05' as report_type, 5 as rank
union all
select 'FM-12' as report_type, 4 as rank
union all
select 'FM-13' as report_type, 4 as rank
union all
select 'FM-14' as report_type, 4 as rank
union all
select 'FM-15' as report_type, 4 as rank
union all
select 'FM-16' as report_type, 1 as rank
union all
select 'MESOW' as report_type, 3 as rank
union all
select 'SAO' as report_type, 2 as rank
union all
select 'SAOSP' as report_type, 4 as rank
union all
select 'SHEF' as report_type, 5 as rank
union all
select 'SOD' as report_type, 6 as rank
union all
select 'SOM' as report_type, 6 as rank
union all
select 'SURF' as report_type, 5 as rank
union all
select 'SY-MT' as report_type, 6 as rank
""")

report_type_rank.createOrReplaceTempView("report_type_rank")

In [None]:
df_weather_compress = spark.sql("""
with base as 
(select 
 t1.station,
 t1.date,
 t1.report_type,
 t1.latitude,
 t1.longitude,
 t1.elevation,
 cast(regexp_replace(t1.HourlyAltimeterSetting,'[^0-9.]+','') as float) as HourlyAltimeterSetting,
 cast(regexp_replace(t1.HourlyDewPointTemperature,'[^0-9.]+','') as float) as HourlyDewPointTemperature,
 cast(regexp_replace(t1.HourlyDryBulbTemperature,'[^0-9.]+','') as float) as HourlyDryBulbTemperature,
 cast(regexp_replace(t1.HourlyRelativeHumidity,'[^0-9.]+','') as float) as HourlyRelativeHumidity,
 case when t1.HourlySkyConditions like '%OVC%' then 1
      when t1.HourlySkyConditions like '%CLR%' then 0
 end as OCV_CLR,
 cast(regexp_replace(t1.HourlyStationPressure,'[^0-9.]+','') as float) as HourlyStationPressure,
 cast(regexp_replace(t1.HourlyVisibility,'[^0-9.]+','') as float) as HourlyVisibility,
 cast(regexp_replace(t1.HourlyWetBulbTemperature,'[^0-9.]+','') as float) as HourlyWetBulbTemperature,
 cast(regexp_replace(t1.HourlyWindDirection,'[^0-9.]+','') as float) as HourlyWindDirection,
 cast(regexp_replace(t1.HourlyWindSpeed,'[^0-9.]+','') as float) as HourlyWindSpeed,
 t2.rank, 
 min(t2.rank) over (partition by t1.station, t1.date) as top_rank
 from df_weather_adj_tz as t1
left join report_type_rank as t2
 on t1.report_type = t2.report_type
),
top_info as (
select 
  station, date, 
  max(report_type) as report_type,
  avg(LATITUDE) as LATITUDE,
  avg(LONGITUDE) as LONGITUDE,
  avg(ELEVATION) as ELEVATION,
  avg(HourlyAltimeterSetting) as HourlyAltimeterSetting,
  avg(HourlyDewPointTemperature) as HourlyDewPointTemperature,
  avg(HourlyDryBulbTemperature) as HourlyDryBulbTemperature,
  avg(HourlyRelativeHumidity) as HourlyRelativeHumidity,
  avg(OCV_CLR) as OCV_CLR,
  avg(HourlyStationPressure) as HourlyStationPressure,
  avg(HourlyVisibility) as HourlyVisibility,
  avg(HourlyWetBulbTemperature) as HourlyWetBulbTemperature,
  avg(HourlyWindDirection) as HourlyWindDirection,
  avg(HourlyWindSpeed) as HourlyWindSpeed
  from base where rank = top_rank
  group by station, date
),
other_info as (
select 
  station, date, 
  avg(LATITUDE) as LATITUDE,
  avg(LONGITUDE) as LONGITUDE,
  avg(ELEVATION) as ELEVATION,
  avg(HourlyAltimeterSetting) as HourlyAltimeterSetting,
  avg(HourlyDewPointTemperature) as HourlyDewPointTemperature,
  avg(HourlyDryBulbTemperature) as HourlyDryBulbTemperature,
  avg(HourlyRelativeHumidity) as HourlyRelativeHumidity,
  avg(OCV_CLR) as OCV_CLR,
  avg(HourlyStationPressure) as HourlyStationPressure,
  avg(HourlyVisibility) as HourlyVisibility,
  avg(HourlyWetBulbTemperature) as HourlyWetBulbTemperature,
  avg(HourlyWindDirection) as HourlyWindDirection,
  avg(HourlyWindSpeed) as HourlyWindSpeed
  from base where rank != top_rank
  group by station, date
),
info_keep as (
select distinct 
  t1.station, t1.date, 
  t1.REPORT_TYPE, 
  coalesce(t1.LATITUDE, t2.LATITUDE) as LATITUDE,
  coalesce(t1.LONGITUDE, t2.LONGITUDE) as LONGITUDE,
  coalesce(t1.ELEVATION, t2.ELEVATION) as ELEVATION,
  coalesce(t1.HourlyAltimeterSetting, t2.HourlyAltimeterSetting) as HourlyAltimeterSetting,
  coalesce(t1.HourlyDewPointTemperature, t2.HourlyDewPointTemperature) as HourlyDewPointTemperature,
  coalesce(t1.HourlyDryBulbTemperature, t2.HourlyDryBulbTemperature) as HourlyDryBulbTemperature,
  coalesce(t1.HourlyRelativeHumidity, t2.HourlyRelativeHumidity) as HourlyRelativeHumidity,
  coalesce(t1.OCV_CLR, t2.OCV_CLR) as OCV_CLR,
  coalesce(t1.HourlyStationPressure, t2.HourlyStationPressure) as HourlyStationPressure,
  coalesce(t1.HourlyVisibility, t2.HourlyVisibility) as HourlyVisibility,
  coalesce(t1.HourlyWetBulbTemperature, t2.HourlyWetBulbTemperature) as HourlyWetBulbTemperature,
  coalesce(t1.HourlyWindDirection, t2.HourlyWindDirection) as HourlyWindDirection,
  coalesce(t1.HourlyWindSpeed, t2.HourlyWindSpeed) as HourlyWindSpeed
from top_info as t1
  left join other_info as t2
  on t1.station = t2.station
  and t1.date = t2.date
),
for_filter as (
select *,
 (case when HourlyAltimeterSetting is null then 1 else 0 end)+
 (case when HourlyDewPointTemperature is null then 1 else 0 end)+
 (case when HourlyDryBulbTemperature is null then 1 else 0 end)+
 (case when HourlyRelativeHumidity is null then 1 else 0 end)+
 (case when OCV_CLR is null then 1 else 0 end)+
 (case when HourlyStationPressure is null then 1 else 0 end)+
 (case when HourlyVisibility is null then 1 else 0 end)+
 (case when HourlyWetBulbTemperature is null then 1 else 0 end)+
 (case when HourlyWindDirection is null then 1 else 0 end)+
 (case when HourlyWindSpeed is null then 1 else 0 end) as null_cnt
from info_keep
)
select *,
    LAG(DATE, 1) OVER (
        PARTITION BY STATION, to_date(DATE)
        ORDER BY DATE
    ) as weather_lag
from for_filter
where null_cnt <= 3
""")

# df_weather_compress.write.parquet(f"{blob_url}/df_weather_to_join_full_v1")
# df_weather_to_join_full = spark.read.parquet(f"{blob_url}/df_weather_to_join_full_v1")
df_weather_compress.createOrReplaceTempView("df_weather_to_join_full_temp")

##### 2.3.4 Table Join Step 4 - Join Weather to Flight
1. Join Weather Info by Station for Departure and Arrival Separately
2. Take the Latest Weather Record within the Flight Time Interval
3. Combine the Departure and Arrival Data

In [None]:
#Joining flights with weather dataset
# df_weather_to_join_full.createOrReplaceTempView("df_weather_to_join_full_temp")
# df_flights_station.createOrReplaceTempView("df_flights_station_temp")
df_flights_weather_station_origin =  spark.sql(
"""
select
df_flights_station_temp.*,
ws_origin.DATE as ws_origin_DATE,
ws_origin.LATITUDE as ws_origin_LATITUDE,
ws_origin.LONGITUDE as ws_origin_LONGITUDE,
ws_origin.ELEVATION as ws_origin_ELEVATION,
ws_origin.HourlyAltimeterSetting as ws_origin_HourlyAltimeterSetting,
ws_origin.HourlyDewPointTemperature as ws_origin_HourlyDewPointTemperature,
ws_origin.HourlyDryBulbTemperature as ws_origin_HourlyDryBulbTemperature,
ws_origin.HourlyRelativeHumidity as ws_origin_HourlyRelativeHumidity,
ws_origin.OCV_CLR as ws_origin_OCV_CLR,
ws_origin.HourlyStationPressure as ws_origin_HourlyStationPressure,
ws_origin.HourlyVisibility as ws_origin_HourlyVisibility,
ws_origin.HourlyWetBulbTemperature as ws_origin_HourlyWetBulbTemperature,
ws_origin.HourlyWindDirection as ws_origin_HourlyWindDirection,
ws_origin.HourlyWindSpeed as ws_origin_HourlyWindSpeed,
ws_origin.weather_lag as ws_origin_weather_lag
from 
df_flights_station_temp
left join df_weather_to_join_full_temp ws_origin on df_flights_station_temp.station_id_origin = ws_origin.station 
and (ws_origin.DATE BETWEEN df_flights_station_temp.CRS_DEP_TIMESTAMP_START AND df_flights_station_temp.CRS_DEP_TIMESTAMP_END)
""")



In [None]:
df_flights_weather_station_origin.createOrReplaceTempView("df_flights_weather_station_origin_temp")
# Aggregate data to obtain only the latest observation in the timeframe
df_flights_weather_station_origin_agg = spark.sql(
"""
SELECT
   t1.*
FROM
    df_flights_weather_station_origin_temp as t1
where
  t1.ws_origin_DATE = (select max(s1.ws_origin_DATE) from df_flights_weather_station_origin_temp as s1
                      where t1.station_id_origin = s1.station_id_origin
                      and t1.CRS_DEP_TIMESTAMP = s1.CRS_DEP_TIMESTAMP
                      and t1.TAIL_NUM = s1.TAIL_NUM
                      and t1.OP_CARRIER_FL_NUM = s1.OP_CARRIER_FL_NUM)

""")

In [None]:
df_flights_weather_station_dest =  spark.sql(
"""
select
df_flights_station_temp.station_id_dest,
df_flights_station_temp.CRS_DEP_TIMESTAMP,
df_flights_station_temp.TAIL_NUM,
df_flights_station_temp.OP_CARRIER_FL_NUM,
ws.DATE as ws_dest_DATE,
ws.LATITUDE as ws_dest_LATITUDE,
ws.LONGITUDE as ws_dest_LONGITUDE,
ws.ELEVATION as ws_dest_ELEVATION,
ws.HourlyAltimeterSetting as ws_dest_HourlyAltimeterSetting,
ws.HourlyDewPointTemperature as ws_dest_HourlyDewPointTemperature,
ws.HourlyDryBulbTemperature as ws_dest_HourlyDryBulbTemperature,
ws.HourlyRelativeHumidity as ws_dest_HourlyRelativeHumidity,
ws.OCV_CLR as ws_dest_OCV_CLR,
ws.HourlyStationPressure as ws_dest_HourlyStationPressure,
ws.HourlyVisibility as ws_dest_HourlyVisibility,
ws.HourlyWetBulbTemperature as ws_dest_HourlyWetBulbTemperature,
ws.HourlyWindDirection as ws_dest_HourlyWindDirection,
ws.HourlyWindSpeed as ws_dest_HourlyWindSpeed,
ws.weather_lag as ws_dest_weather_lag
from 
df_flights_station_temp
left join df_weather_to_join_full_temp ws on df_flights_station_temp.station_id_dest = ws.station 
and (ws.DATE BETWEEN df_flights_station_temp.CRS_DEP_TIMESTAMP_START AND df_flights_station_temp.CRS_DEP_TIMESTAMP_END)
""")


In [None]:
df_flights_weather_station_dest.createOrReplaceTempView("df_flights_weather_station_dest_temp")
df_flights_weather_station_dest_agg = spark.sql(
"""
SELECT
   t1.*
FROM
    df_flights_weather_station_dest_temp as t1
where
  t1.ws_dest_DATE = (select max(s1.ws_dest_DATE) from df_flights_weather_station_dest_temp as s1
                      where t1.station_id_dest = s1.station_id_dest
                      and t1.CRS_DEP_TIMESTAMP = s1.CRS_DEP_TIMESTAMP
                      and t1.TAIL_NUM = s1.TAIL_NUM
                      and t1.OP_CARRIER_FL_NUM = s1.OP_CARRIER_FL_NUM)

""")


In [None]:
##joining origin and destination flight, weather and station data
df_flights_weather_station_origin_agg.createOrReplaceTempView("df_flights_weather_station_origin_agg_temp")
df_flights_weather_station_dest_agg.createOrReplaceTempView("df_flights_weather_station_dest_agg_temp")
df_flights_weather_station_origin_dest= spark.sql(
"""
select 
fws1.*,
ws_dest_DATE,
ws_dest_LATITUDE,
ws_dest_LONGITUDE,
ws_dest_ELEVATION,
ws_dest_HourlyAltimeterSetting,
ws_dest_HourlyDewPointTemperature,
ws_dest_HourlyDryBulbTemperature,
ws_dest_HourlyRelativeHumidity,
ws_dest_OCV_CLR,
ws_dest_HourlyStationPressure,
ws_dest_HourlyVisibility,
ws_dest_HourlyWetBulbTemperature,
ws_dest_HourlyWindDirection,
ws_dest_HourlyWindSpeed,
ws_dest_weather_lag
from 
df_flights_weather_station_origin_agg_temp fws1
left join df_flights_weather_station_dest_agg_temp fws2
  on fws1.station_id_dest = fws2.station_id_dest
  and fws1.CRS_DEP_TIMESTAMP = fws2.CRS_DEP_TIMESTAMP
  and fws1.TAIL_NUM = fws2.TAIL_NUM
  and fws1.OP_CARRIER_FL_NUM = fws2.OP_CARRIER_FL_NUM
""")

# df_flights_weather_station_origin_dest.write.mode("overwrite").parquet(f"{blob_url}/flights_weather_station")

#### 2.3.5 Table Join Data sizes
- Data sizes
  
  - Join Weather to Flight origin and destination
    - df_weather_compress: Cleaned/Deduped weather data 30,528,602 records
    - df_flights_station: Cleaned/Deduped flight data containing airport and station metadata 41,551,808 records
    
  - Join Origin and Destination including flight data and latest weather measurement 2 hours prior CRS departure time
    - df_flights_weather_station_dest_agg 41,551,808 records
    - df_flights_weather_station_origin_agg 41,551,808 records

- Total join time

  - It took a total of 3.38 hours to perform the final join

- Cluster specification

  - 1-4 Workers: 16-64 GB Memory, 4-16 Cores
  - 1 Driver: 16 GB Memory, 4 Cores
  - Runtime: 11.3.x-cpu-ml-scala2.12

#### 2.4 Data Cleaning and Validation

We are joining several complex data sets and we expect to find a number of issues that require data cleaning.  Some of the levels of different categorical variables may need to be merged, we will need to handle missing values, and normalize other numeric quantities.

##### 2.4.1 NULL Values

We will drop all columns with more than 50% NULL values, which are identified through data summarization.

##### 2.4.2 Outliers

We will carefully report any outlier data that we believe should be excluded, and only exclude it after a thorough investigation.

##### 2.4.3 Normalization and Scaling

We will normalize variables should be treated numerically like weather information.

##### 2.4.4 Class Imbalance

We have about a 5:1 imbalance between our "no delay" and "delay" classes.  For some of our modelling work, we may need to boost the number of training examples from our "delay" category to build a predictive model.

#### 2.5 EDA - Post Join

In [None]:
df_flights_weather_station_origin_dest = spark.read.parquet(f"{blob_url}/flights_weather_station_v2")
df_flights_weather_station_origin_dest.display()

station_id_origin,station_id_dest,CRS_DEP_TIMESTAMP_START,CRS_DEP_TIMESTAMP_END,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_WAC,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_WAC,CRS_DEP_TIME,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_IN,CRS_ARR_TIME,CRS_ELAPSED_TIME,DISTANCE,DISTANCE_GROUP,YEAR,CRS_DEP_TIMESTAMP,tz_time_zone_origin,tz_time_zone_dest,ws_origin_DATE,ws_origin_LATITUDE,ws_origin_LONGITUDE,ws_origin_ELEVATION,ws_origin_HourlyAltimeterSetting,ws_origin_HourlyDewPointTemperature,ws_origin_HourlyDryBulbTemperature,ws_origin_HourlyRelativeHumidity,ws_origin_OCV_CLR,ws_origin_HourlyStationPressure,ws_origin_HourlyVisibility,ws_origin_HourlyWetBulbTemperature,ws_origin_HourlyWindDirection,ws_origin_HourlyWindSpeed,ws_origin_weather_lag,ws_dest_DATE,ws_dest_LATITUDE,ws_dest_LONGITUDE,ws_dest_ELEVATION,ws_dest_HourlyAltimeterSetting,ws_dest_HourlyDewPointTemperature,ws_dest_HourlyDryBulbTemperature,ws_dest_HourlyRelativeHumidity,ws_dest_OCV_CLR,ws_dest_HourlyStationPressure,ws_dest_HourlyVisibility,ws_dest_HourlyWetBulbTemperature,ws_dest_HourlyWindDirection,ws_dest_HourlyWindSpeed,ws_dest_weather_lag
78543011640,,2015-01-03T15:55:00.000+0000,2015-01-03T19:55:00.000+0000,1,1,3,6,2015-01-03,B6,N187JB,2035,STT,"Charlotte Amalie, VI",VI,4,SJU,"San Juan, PR",PR,3,1755,0.0,-1,1700-1759,18.0,1836,41.0,68.0,1,2015,2015-01-03T21:55:00.000+0000,America/St_Thomas,America/Puerto_Rico,2015-01-03T19:53:00.000+0000,18.3363,-64.98,6.1,30.100000381469727,67.0,83.0,59.0,0.0,30.09000015258789,10.0,72.0,100.0,18.0,2015-01-03T18:53:00.000+0000,,,,,,,,,,,,,,,
72259003927,,2015-01-14T13:00:00.000+0000,2015-01-14T17:00:00.000+0000,1,1,14,3,2015-01-14,AA,N633AA,1180,DFW,"Dallas/Fort Worth, TX",TX,74,SJU,"San Juan, PR",PR,3,1300,0.0,0,1300-1359,6.0,1924,264.0,2165.0,9,2015,2015-01-14T19:00:00.000+0000,America/Chicago,America/Puerto_Rico,2015-01-14T16:53:00.000+0000,32.8978,-97.0189,170.7,30.38999938964844,23.0,38.0,55.0,1.0,29.739999771118164,10.0,32.0,20.0,9.0,2015-01-14T15:53:00.000+0000,,,,,,,,,,,,,,,
91182022521,,2015-02-08T18:50:00.000+0000,2015-02-08T22:50:00.000+0000,1,2,8,7,2015-02-08,UA,N772UA,201,HNL,"Honolulu, HI",HI,2,GUM,"Guam, TT",TT,5,1450,0.0,-1,1400-1459,6.0,1840,470.0,3801.0,11,2015,2015-02-09T00:50:00.000+0000,Pacific/Honolulu,Pacific/Guam,2015-02-08T21:53:00.000+0000,21.324,-157.9294,2.1,29.979999542236328,51.0,79.0,38.0,,29.959999084472656,10.0,63.0,210.0,7.0,2015-02-08T20:53:00.000+0000,,,,,,,,,,,,,,,
72243012960,,2015-02-18T10:39:00.000+0000,2015-02-18T14:39:00.000+0000,1,2,18,3,2015-02-18,UA,N13248,1576,IAH,"Houston, TX",TX,74,SJU,"San Juan, PR",PR,3,1039,0.0,0,1000-1059,7.0,1650,251.0,2007.0,9,2015,2015-02-18T16:39:00.000+0000,America/Chicago,America/Puerto_Rico,2015-02-18T13:53:00.000+0000,29.98,-95.36,29.0,30.31999969482422,33.0,35.0,93.0,0.0,30.209999084472656,10.0,34.0,0.0,0.0,2015-02-18T12:53:00.000+0000,,,,,,,,,,,,,,,
72530094846,,2015-02-22T08:25:00.000+0000,2015-02-22T12:25:00.000+0000,1,2,22,7,2015-02-22,UA,N66803,1688,ORD,"Chicago, IL",IL,41,SJU,"San Juan, PR",PR,3,825,0.0,0,0800-0859,7.0,1510,285.0,2072.0,9,2015,2015-02-22T14:25:00.000+0000,America/Chicago,America/Puerto_Rico,2015-02-22T12:00:00.000+0000,41.96019,-87.93162,201.8,,7.0,17.0,64.0,,29.56999969482422,9.9399995803833,15.0,330.0,15.0,2015-02-22T11:51:00.000+0000,,,,,,,,,,,,,,,
91182022521,,2015-03-25T18:05:00.000+0000,2015-03-25T22:05:00.000+0000,1,3,25,3,2015-03-25,UA,N215UA,201,HNL,"Honolulu, HI",HI,2,GUM,"Guam, TT",TT,5,1405,0.0,-1,1400-1459,5.0,1755,470.0,3801.0,11,2015,2015-03-26T00:05:00.000+0000,Pacific/Honolulu,Pacific/Guam,2015-03-25T21:53:00.000+0000,21.324,-157.9294,2.1,30.09000015258789,63.0,80.0,56.0,,30.06999969482422,10.0,69.0,60.0,10.0,2015-03-25T20:53:00.000+0000,,,,,,,,,,,,,,,
72243012960,,2015-03-30T09:29:00.000+0000,2015-03-30T13:29:00.000+0000,1,3,30,1,2015-03-30,UA,N37465,1210,IAH,"Houston, TX",TX,74,SJU,"San Juan, PR",PR,3,1029,0.0,0,1000-1059,9.0,1548,259.0,2007.0,9,2015,2015-03-30T15:29:00.000+0000,America/Chicago,America/Puerto_Rico,2015-03-30T12:53:00.000+0000,29.98,-95.36,29.0,30.200000762939453,65.0,67.0,93.0,,30.09000015258789,8.0,66.0,0.0,0.0,2015-03-30T11:53:00.000+0000,,,,,,,,,,,,,,,
78551011624,,2015-03-30T13:37:00.000+0000,2015-03-30T17:37:00.000+0000,1,3,30,1,2015-03-30,B6,N309JB,341,STX,"Christiansted, VI",VI,4,SJU,"San Juan, PR",PR,3,1537,0.0,0,1500-1559,5.0,1621,44.0,94.0,1,2015,2015-03-30T19:37:00.000+0000,America/St_Thomas,America/Puerto_Rico,2015-03-30T16:53:00.000+0000,17.6997,-64.8125,18.6,30.059999465942383,65.0,85.0,51.0,,30.0,10.0,72.0,120.0,15.0,2015-03-30T15:53:00.000+0000,,,,,,,,,,,,,,,
72259003927,,2015-05-18T12:50:00.000+0000,2015-05-18T16:50:00.000+0000,2,5,18,1,2015-05-18,AA,N5CGAA,1180,DFW,"Dallas/Fort Worth, TX",TX,74,SJU,"San Juan, PR",PR,3,1350,0.0,0,1300-1359,5.0,1928,278.0,2165.0,9,2015,2015-05-18T18:50:00.000+0000,America/Chicago,America/Puerto_Rico,2015-05-18T16:44:00.000+0000,32.8978,-97.0189,170.7,30.100000381469727,71.0,83.0,67.0,,29.459999084472656,10.0,75.0,,3.0,2015-05-18T15:53:00.000+0000,,,,,,,,,,,,,,,
78543011640,,2015-05-19T06:12:00.000+0000,2015-05-19T10:12:00.000+0000,2,5,19,2,2015-05-19,B6,N307JB,1035,STT,"Charlotte Amalie, VI",VI,4,SJU,"San Juan, PR",PR,3,812,0.0,-1,0800-0859,4.0,849,37.0,68.0,1,2015,2015-05-19T12:12:00.000+0000,America/St_Thomas,America/Puerto_Rico,2015-05-19T09:53:00.000+0000,18.3363,-64.98,6.1,29.989999771118164,71.0,76.0,85.0,0.0,29.979999542236328,10.0,73.0,80.0,5.0,2015-05-19T08:53:00.000+0000,,,,,,,,,,,,,,,


In [None]:
df_flights_weather_station_origin_dest.createOrReplaceTempView("joined_table_temp")

___Delay vs Non-Delay by State___
<br>`The plot shows the number of delayed and non-delay flights by departure state`
<br>No obvious difference between delay vs non-delay flights.


___Delay vs Non-Delay by Dew Point Temp___
<br>`The plot shows the number of delayed and non-delay flights by Dew Point Temperature`
<br>The relative steep increase from 10 to 30 could be observed from non-delay which is not that obvious for delayed flights.


___Delay vs Non-Delay by Dry Buld Temp___
<br>`The plot shows the number of delayed and non-delay flights by Dry Bulb Temperature`
<br>The trends are not significantly differ between delayed vs non-delayed flights.


___Delay vs Non-Delay by Relative Humidity___
<br>`The plot shows the number of delayed and non-delay flights by Relative Humidity`
<br>The overal trends are similar across delayed vs non-delayed flights, but the relative changes are not always consistent.


___Delay vs Non-Delay by Pressure___
<br>`The plot shows the number of delayed and non-delay flights by Station Pressure`
<br>The trends are not significantly differ between delayed vs non-delayed flights.

In [None]:
%sql

select 
year, quarter, month,
origin_state_abr, 
ws_origin_HourlyDewPointTemperature,
ws_origin_HourlyDryBulbTemperature,
ws_origin_HourlyRelativeHumidity,
ws_origin_HourlyStationPressure,
ws_origin_HourlyVisibility,
ws_origin_HourlyWetBulbTemperature,
ws_origin_HourlyWindSpeed,
ws_origin_OCV_CLR,
sum(dep_del15) as delay_cnt,
sum(1-dep_del15) as non_deplay_cnt
from joined_table_temp
group by year, quarter, month,
origin_state_abr, 
ws_origin_HourlyDewPointTemperature,
ws_origin_HourlyDryBulbTemperature,
ws_origin_HourlyRelativeHumidity,
ws_origin_HourlyStationPressure,
ws_origin_HourlyVisibility,
ws_origin_HourlyWetBulbTemperature,
ws_origin_HourlyWindSpeed,
ws_origin_OCV_CLR;

year,quarter,month,origin_state_abr,ws_origin_HourlyDewPointTemperature,ws_origin_HourlyDryBulbTemperature,ws_origin_HourlyRelativeHumidity,ws_origin_HourlyStationPressure,ws_origin_HourlyVisibility,ws_origin_HourlyWetBulbTemperature,ws_origin_HourlyWindSpeed,ws_origin_OCV_CLR,delay_cnt,non_deplay_cnt
2016,3,7,AK,48.0,66.0,52.0,29.81999969482422,10.0,56.0,7.0,1.0,0.0,3.0
2015,2,6,AK,20.0,27.0,75.0,29.93000030517578,10.0,25.0,17.0,1.0,0.0,1.0
2017,3,7,AK,52.0,52.0,100.0,30.059999465942383,10.0,52.0,6.0,1.0,0.0,2.0
2017,3,9,CO,43.0,45.0,93.0,24.549999237060547,10.0,44.0,7.0,1.0,1.0,7.0
2019,1,2,MN,6.0,10.0,84.0,29.489999771118164,3.0,9.0,6.0,1.0,3.0,7.0
2021,2,4,IL,47.0,51.0,86.0,28.8799991607666,9.0,49.0,8.0,1.0,3.0,59.0
2017,2,6,CO,54.0,62.0,75.0,24.770000457763672,10.0,57.0,10.0,1.0,1.0,6.0
2021,2,4,MN,28.0,42.0,58.0,29.07999992370605,10.0,36.0,15.0,1.0,7.0,23.0
2015,4,11,MO,40.0,58.0,51.0,29.40999984741211,10.0,49.0,14.0,1.0,0.0,9.0
2020,2,6,IL,44.0,78.0,30.0,29.600000381469727,10.0,59.0,14.0,0.0,0.0,1.0


Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

#### 2.6 DownSampling

We count the ratio between delayed vs non-delayed flights and apply it back to the majority which is the non-delayed flights to get a more balanced data for both EDA and modelling.

In [None]:
numeric_cols_keep = [
#  'QUARTER',
 'MONTH',
 'DAY_OF_MONTH',
 'DAY_OF_WEEK',
#  'OP_CARRIER_FL_NUM',
#  'ORIGIN_WAC',
#  'DEST_WAC',
#  'CRS_DEP_TIME',
#  'DEP_DEL15', # response variable
#  'DEP_DELAY_GROUP',
#  'TAXI_IN',
#  'CRS_ARR_TIME',
 'CRS_ELAPSED_TIME',
 'DISTANCE',
 'DISTANCE_GROUP',
 'YEAR',
 #'DIVERTED',
#  'ws_origin_LATITUDE',
#  'ws_origin_LONGITUDE',
#  'ws_origin_ELEVATION',
 'origin_weather_Avg_HourlyAltimeterSetting',
 'origin_weather_Avg_HourlyDewPointTemperature',
 'origin_weather_Avg_HourlyDryBulbTemperature',
 'origin_weather_Avg_HourlyRelativeHumidity',
 'origin_weather_Avg_HourlyStationPressure',
 'origin_weather_Avg_HourlyVisibility',
 'origin_weather_Avg_HourlyWetBulbTemperature',
 'origin_weather_Avg_HourlyWindDirection',
 'origin_weather_Avg_HourlyWindSpeed',
#  'ws_dest_LATITUDE',
#  'ws_dest_LONGITUDE',
#  'ws_dest_ELEVATION',
 'dest_weather_Avg_HourlyAltimeterSetting',
 'dest_weather_Avg_HourlyDewPointTemperature',
 'dest_weather_Avg_HourlyDryBulbTemperature',
 'dest_weather_Avg_HourlyRelativeHumidity',
 'dest_weather_Avg_HourlyStationPressure',
 'dest_weather_Avg_HourlyVisibility',
 'dest_weather_Avg_HourlyWetBulbTemperature',
 'dest_weather_Avg_HourlyWindDirection',
 'dest_weather_Avg_HourlyWindSpeed',
 'AIRPORT_ORIGIN_PAGERANK'
]


categorical_cols_keep = [
 'OP_CARRIER_FL_NUM', # treat as categorical
#  'station_id_origin',
#  'station_id_dest',
#  'FL_DATE',
 'OP_UNIQUE_CARRIER',
 'TAIL_NUM',
 'ORIGIN',
 'ORIGIN_CITY_NAME',
 'ORIGIN_STATE_ABR',
 'DEST',
 'DEST_CITY_NAME',
 'DEST_STATE_ABR',
#  'DEP_TIME_BLK',
#  'tz_time_zone_origin',
#  'tz_time_zone_dest',
 'FL_DATE_IS_HOLIDAY' # treat as boolean
]


df_train_mean = df_flights_weather_station.where("year<=2020").select(*[mean(c).alias(c) for c in numeric_cols_keep])
df_train_mean.createOrReplaceTempView("df_train_mean")

df_delay_full = df_flights_weather_station.where("dep_del15 == 1")
df_non_delay_full = df_flights_weather_station.where("dep_del15 == 0")
delay_record_count = df_delay_full.count()
nondelay_record_count = df_non_delay_full.count()
ratio = int(nondelay_record_count / delay_record_count)
print("Delayed: ", delay_record_count, " Non-Delayed: ", nondelay_record_count, " Pct: ", delay_record_count/(delay_record_count + nondelay_record_count), " Ratio: ", ratio)

# Downsample the majority DF (the df_non_delay_full)
sampled_majority_df = df_non_delay_full.sample(False, 1/ratio)
sampled_cnt = sampled_majority_df.count()
print("sampled_majority_df: ", sampled_cnt, " delay record count: ", delay_record_count, " sum: ", sampled_cnt + delay_record_count)
downsampled_df = sampled_majority_df.unionAll(df_delay_full)

original_rows = df_flights_weather_station.count()
downsampled_rows = downsampled_df.count()

print("Records in original DF: ", original_rows, " Downsampled DF: ", downsampled_rows, " Ratio: ", downsampled_rows/original_rows)
downsampled_df.createOrReplaceTempView("df_downsampled")

Delayed:  7079662  Non-Delayed:  34353725  Pct:  0.1708685316988447  Ratio:  4
sampled_majority_df:  8589980  delay record count:  7079662  sum:  15669642
Records in original DF:  42286711  Downsampled DF:  15669642  Ratio:  0.3705571237261749


In [None]:
downsampled_df.dtypes

Out[7]: [('_utc_dept_ts', 'timestamp'),
 ('_utc_dept_minus2_ts', 'timestamp'),
 ('_utc_dept_actual_ts', 'timestamp'),
 ('_utc_arr_ts', 'timestamp'),
 ('_utc_arr_actual_ts', 'timestamp'),
 ('_dep_time_str', 'string'),
 ('_local_dept_ts', 'timestamp'),
 ('_local_dept_minus2_ts', 'timestamp'),
 ('_local_dept_actual_ts', 'timestamp'),
 ('_local_at_src_airport_arr_ts', 'timestamp'),
 ('_local_at_src_airport_arr_actual_ts', 'timestamp'),
 ('QUARTER', 'int'),
 ('MONTH', 'int'),
 ('DAY_OF_MONTH', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('FL_DATE', 'string'),
 ('OP_UNIQUE_CARRIER', 'string'),
 ('OP_CARRIER_AIRLINE_ID', 'int'),
 ('OP_CARRIER', 'string'),
 ('TAIL_NUM', 'string'),
 ('OP_CARRIER_FL_NUM', 'int'),
 ('ORIGIN_AIRPORT_ID', 'int'),
 ('ORIGIN_AIRPORT_SEQ_ID', 'int'),
 ('ORIGIN_CITY_MARKET_ID', 'int'),
 ('ORIGIN', 'string'),
 ('ORIGIN_CITY_NAME', 'string'),
 ('ORIGIN_STATE_ABR', 'string'),
 ('ORIGIN_STATE_FIPS', 'int'),
 ('ORIGIN_STATE_NM', 'string'),
 ('ORIGIN_WAC', 'int'),
 ('DEST_AIRPORT_ID',

In [None]:
downsampled_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in numeric_cols_keep]).toPandas()

Unnamed: 0,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,CRS_ELAPSED_TIME,DISTANCE,DISTANCE_GROUP,YEAR,origin_weather_Avg_HourlyAltimeterSetting,origin_weather_Avg_HourlyDewPointTemperature,origin_weather_Avg_HourlyDryBulbTemperature,...,dest_weather_Avg_HourlyAltimeterSetting,dest_weather_Avg_HourlyDewPointTemperature,dest_weather_Avg_HourlyDryBulbTemperature,dest_weather_Avg_HourlyRelativeHumidity,dest_weather_Avg_HourlyStationPressure,dest_weather_Avg_HourlyVisibility,dest_weather_Avg_HourlyWetBulbTemperature,dest_weather_Avg_HourlyWindDirection,dest_weather_Avg_HourlyWindSpeed,AIRPORT_ORIGIN_PAGERANK
0,0,0,0,22,0,0,0,2466,7421,4526,...,3697,8311,5440,8651,62990,4346,69481,598828,10449,0


#### 2.7 Feature Dictionary Post Join
|Feature                                |  Description                                                                      |
|---------------------------------------|-----------------------------------------------------------------------------------|      
|  **Categorical**                                                                                                            |
|  OP_CARRIER_FL_NUM                    |  Carrier-Flight identifier                                                        |
|  station_id_origin                    |  Origin station unique identifier                                                 |
|  station_id_dest                      |  Destination station unique identifier                                            |
|  FL_DATE                              |  Flight date (YYYY-MM-DD)                                                         |
|  OP_UNIQUE_CARRIER                    |  Flight Carrier unique identifier                                                 |
|  TAIL_NUM                             |  Tail Number Flight tracker                                                       |
|  ORIGIN                               |  Origin Airport IATA code                                                         |                          
|  ORIGIN_CITY_NAME                     |  Origin Airport City Name                                                         | 
|  ORIGIN_STATE_ABR                     |  Origin Airport State abbreviation                                                | 
|  DEST                                 |  Destination Airport IATA code                                                    |
|  DEST_CITY_NAME                       |  Destination Airport City Name                                                    |
|  DEST_STATE_ABR                       |  Destination Airport State abbreviation                                           |
|  FL_DATE_IS_HOLIDAY                   |  FLIGHT DATE IS US HOLIDAY 1 = true, 0 = false                                    |
|  **Numerical**                                                                                                            |
|  MONTH                                |  Flight Month                                                                     |
|  DAY_OF_MONTH                         |  Flight Day of the month                                                          |
|  DAY_OF_WEEK                          |  Flight Day of week                                                               |
|  CRS_ELAPSED_TIME                     |  CRS estimated flight elapsed time                                                |
|  DISTANCE                             |  Distance between airports                                                        |
|  DISTANCE_GROUP                       |  Distance Intervals, every 250 Miles, for Flight Segment                          | 
|  YEAR                                 |  Flight year                                                                      |
|  ws_origin_LATITUDE                   |  Origin Weather station Latitude                                                  |
|  ws_origin_LONGITUDE                  |  Origin Weather station Longitude                                                 |
|  ws_origin_ELEVATION                  |  Origin Weather station elevation                                                 |
|  ws_origin_HourlyAltimeterSetting     |  Origin Weather station Hourly Altimeter measurement                              |
|  ws_origin_HourlyDewPointTemperature  |  Origin Weather station Hourly Dew Point Temperature measurement                  |
|  ws_origin_HourlyDryBulbTemperature   |  Origin Weather station Hourly Dry Bulb Temperature measurement                   |
|  ws_origin_HourlyRelativeHumidity     |  Origin Weather station Hourly Relative Humidity measurement                      |
|  ws_origin_OCV_CLR                    |  Origin Weather station Hourly Sky Code, OCV(overcloud) = 1 CLR(clear) = 0        |
|  ws_origin_HourlyStationPressure      |  Origin Weather station Hourly Station Pressure measurement                       |
|  ws_origin_HourlyVisibility           |  Origin Weather station Hourly Visibility measurement                             |
|  ws_origin_HourlyWetBulbTemperature   |  Origin Weather station Hourly Wet Bulb Temperature measurement                   |
|  ws_origin_HourlyWindDirection        |  Origin Weather station Hourly Wind Direction measurement                         |
|  ws_origin_HourlyWindSpeed            |  Origin Weather station Hourly Wind Speed measurement                             |
|  ws_dest_LATITUDE                     |  Destination Weather station Latitude                                             |
|  ws_dest_LONGITUDE                    |  Destination Weather station Longitude                                            |
|  ws_dest_ELEVATION                    |  Destination Weather station elevation                                            |
|  ws_dest_HourlyAltimeterSetting       |  Destination Weather station Hourly Altimeter measurement                         |
|  ws_dest_HourlyDewPointTemperature    |  Destination Weather station Hourly Dew Point Temperature measurement             |
|  ws_dest_HourlyDryBulbTemperature     |  Destination Weather station Hourly Dry Bulb Temperature measurement              |
|  ws_dest_HourlyRelativeHumidity       |  Destination Weather station Hourly Relative Humidity measurement                 |
|  ws_dest_OCV_CLR                      |  Destination Weather station Hourly Sky Code, OCV(overcloud) = 1 CLR(clear) = 0   |
|  ws_dest_HourlyStationPressure        |  Destination Weather station Hourly Station Pressure measurement                  |
|  ws_dest_HourlyVisibility             |  Destination Weather station Hourly Visibility measurement                        |
|  ws_dest_HourlyWetBulbTemperature     |  Destination Weather station Hourly Wet Bulb Temperature measurement              |
|  ws_dest_HourlyWindDirection          |  Destination Weather station Hourly Wind Direction measurement                    |
|  ws_dest_HourlyWindSpeed              |  Destination Weather station Hourly Wind Speed measurement                        |
|  AIRPORT_ORIGIN_PAGERANK              |  Graph feature to determine Origin Airport pagerank per year                      |

##### 2.7.1 Time-based feature - Flight day is holiday

In [None]:
df_holiday = spark.read.option("header",True).csv(f"{blob_url}/us_holiday_dates.csv")

df_holiday.createOrReplaceTempView("df_holiday_temp")
df_flights_weather_station.createOrReplaceTempView("df_flights_weather_station_temp")

df_flights_weather_station = spark.sql(
"""
select 
df_flights_weather_station_temp.*,
CASE WHEN df_holiday_temp.Holiday IS NULL 
            THEN 0 
            ELSE 1 
END AS FL_DATE_IS_HOLIDAY
from df_flights_weather_station_temp
left join df_holiday_temp on to_date(df_flights_weather_station_temp.FL_DATE) = to_date(df_holiday_temp.date)
"""
)


##### 2.7.1.1 EDA - Delay vs Non-Delay by Holiday

The plot shows the total number of delays per date and identifies the date as a holiday or not-holiday. We can observe that the highest number of delays occur close to a holiday

In [None]:
%sql
select 
YEAR,
to_date(FL_DATE),
FL_DATE_IS_HOLIDAY, 
sum(dep_del15) as delay_cnt,
sum(1-dep_del15) as non_deplay_cnt
from df_downsampled
where year<=2020
group by
YEAR,
to_date(FL_DATE),
FL_DATE_IS_HOLIDAY;

<img src="https://user-images.githubusercontent.com/88794396/204213115-4ab80a70-9a26-44ee-bfdb-566533ee6412.png?raw=true>" width=100%>

##### 2.7.2 Graph-based feature - Origin Airport Pagerank
We are using pagerank to create a feature that identifies the traffic per origin airports per year.

In [None]:
def pageRankPerYear(df, rank_year):
    df.createOrReplaceTempView("df_temp")
    G=nx.Graph()
    df_airport_edges = df.select(col("ORIGIN"), col("DEST")).filter(col("YEAR") == rank_year).toPandas()
    G = nx.from_pandas_edgelist(df_airport_edges, 'ORIGIN', 'DEST')
    pr=nx.pagerank(G, alpha=0.15, max_iter=10)
    data = [(k, v, rank_year) for k, v in pr.items()]
    columns= ['AIRPORT_CODE', 'PAGERANK', 'YEAR']
    newDataframe = spark.createDataFrame(data, columns)
    return newDataframe

df_years = df_flights_weather_station.select('YEAR').distinct()
df_pagerank_2015 = pageRankPerYear(df_flights_weather_station, '2015')
df_pagerank_2016 = pageRankPerYear(df_flights_weather_station, '2016')
df_pagerank_2017 = pageRankPerYear(df_flights_weather_station, '2017')
df_pagerank_2018 = pageRankPerYear(df_flights_weather_station, '2018')
df_pagerank_2019 = pageRankPerYear(df_flights_weather_station, '2019')
df_pagerank_2020 = pageRankPerYear(df_flights_weather_station, '2020')
df_pagerank_2021 = pageRankPerYear(df_flights_weather_station, '2021')
df_final_pagerank = df_pagerank_2015.union(df_pagerank_2016)
df_final_pagerank = df_final_pagerank.union(df_pagerank_2017)
df_final_pagerank = df_final_pagerank.union(df_pagerank_2018)
df_final_pagerank = df_final_pagerank.union(df_pagerank_2019)
df_final_pagerank = df_final_pagerank.union(df_pagerank_2020)
df_final_pagerank = df_final_pagerank.union(df_pagerank_2021)

df_flights_weather_station.createOrReplaceTempView("df_flights_weather_station_temp")
df_final_pagerank.createOrReplaceTempView("df_page_rank_v1_temp")

df_flights_weather_station = spark.sql(
"""
select
df_flights_weather_station_temp.*,
df_page_rank_v1_temp.PAGERANK as AIRPORT_ORIGIN_PAGERANK
from
df_flights_weather_station_temp
left join df_page_rank_v1_temp on df_flights_weather_station_temp.ORIGIN = df_page_rank_v1_temp.AIRPORT_CODE and df_flights_weather_station_temp.YEAR = df_page_rank_v1_temp.YEAR
""")

##### 2.7.2.1 EDA - Airport Page Rank

The graph below shows the top ten airports with the highest pagerank results per year. We can observe that DFW (Dallas/Fort Worth International Airport) ranks as number 1 in 2015, 2018, 2019 and 2020 and ATL(Hartsfield-Jackson Atlanta International Airport (ATL)) ranks as number 1 in 2026 and 2017.

In [None]:
%sql
SELECT rank_filter.* FROM (
  SELECT 
      df_page_rank_v1.AIRPORT_CODE,
      df_page_rank_v1.YEAR,
      rank() OVER (
          PARTITION BY YEAR
          ORDER BY PAGERANK DESC
      ) as RANK
  FROM df_page_rank_v1
) rank_filter WHERE RANK <= 10


<img src="https://user-images.githubusercontent.com/88794396/204217090-5b53689e-ed6c-4d9e-810b-723e857cd102.png?raw=true>" width=100%>

### 3. Machine Learning Algorithms

#### 3.1 Logistic Regression

Our first ML algorithm will be Logistic Regression.  We believe that with a variety of regression variables available to us that we can explore the feature space and build a logistic regression model that uses the source and destination city, airline, weather, seasonality, recent delay history, and many other robust features described above to classify all flights into two categories -- those with >= 15 minute delay of departure, and those without.

##### 3.1.1 Example Simple Logistic Regression Model

$$logit(p_i) = \beta_0 + \beta_1 x_{1,i} + \beta_2 x_{2,i} + ... + \beta_n x_{n,i}$$

Where features will be things like "airline carrier", "source city", "destination city", "raining at source city", "raining at destination city", "is winter", "is thunderstorm", etc.

We have a large number of features to choose from, and so we will explore adding an L2 regularization term to avoid overfitting on large set of features.

#### 3.2 Random Forests

We will also look to fit a Random Forest model to this data set.  We believe a random forest will be useful because it is robust to inclusion of irrelevant features, and invariant under scaling and transformation of many of the feature values.

#### 3.3 Hyperparameter Tuning

We choose grid search for our experiments below - different than random search that for parameters, not all the values are tested and values tested are selected at random. 
<br>Since we don't have any expectation before the experiements, we'd like to set the specific values to be tested so to have a better understanding of how parameters affect the model's performance.

### 4. Machine Learning Pipelines

#### 4.1 Pipeline Description

a) Data Engineering:
- Step 1	
  - Ingest provided parquet files containing flights, weather, and station information and create corresponding dataframes.
  - Ingest external source airport code data in parquet format and create dataframe.
  - Perform EDA.
  - Data Cleaning.
  - Create a final dataframe with the result of the joined dataframes and store it in Azure Blob Storage in parquet format.

- Step 2
  - Ingest final_dataset from  Azure Blob Storage.
  - Select features
  - Split the data into Train, Validation, and Test dataset.
  - Perform normalization on the Train, Validation, and Test dataset using the Train dataset.
  - Store the normalized Train, Validation, and Test dataset Azure Blob Storage in parquet format.

b) Model Training
- Ingest the Train and Validation datasets from  Azure Blob Storage
- Build baseline model
- Build and Train the classification model
- Perform cross-validation and hyperparameter tuning

c) Model Evaluation
- Ingest the Test dataset from Azure Blob Storage
- Run the trained model on the Test dataset
- Evaluate model
- Store predictions in Azure Blob Storage

#### 4.2 Pipeline Block Diagram
<img src="https://github.com/carla-cortez/261/blob/main/ml_diagram.png?raw=true>" width=75%>

#### 4.3 Data Split Plan

The time serires data from 2015 to 2021 will be split in Train, Validation and Test datasets. We will remove the data from the year 2020 as it is considered an outlier.
- Train dataset: 2015 - 2018
- Validation dataset: 2019
- Test dataset: 2021

For cross-validation:
We will use a rolling window cross validation technique for time series. Having a fold for each year of data. For example:
- train on data from 2015 to predict 2016.  
- train on data from 2016 to predict 2017.
- train on data from 2017 to predict 2018.

#### 4.4 Loss Function

We will aim to minimize our Log Loss function:

$$ Log Loss = \sum_{(x,y) \in D} -y log(y') - (1-y)log(1-y')$$

#### 4.5 Experiments
We are coming at this problem from the business perspective of our ability to predict a delay so that we can minimize the impact of our effected passengers.  For this reason, the cost of a False Negative is more important to us than the cost of a False Positive.

##### 4.5.1 Data Setup

__Load Data__

In [None]:
import networkx as nx
from pyspark.sql.functions import udf, col, mean, isnan, when, count, max
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
from scipy.sparse import csc_matrix
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

blob_container = "projectcontainer" # The name of your container created in https://portal.azure.com
storage_account = "rhussain" # The name of your Storage account created in https://portal.azure.com
secret_scope = "redwanscope2" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261key2" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261-joined"

In [None]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [None]:
df_2015 = spark.read.parquet(f"{mount_path}/YEAR=2015/")
df_2016 = spark.read.parquet(f"{mount_path}/YEAR=2016/")
df_2017 = spark.read.parquet(f"{mount_path}/YEAR=2017/")
df_2018 = spark.read.parquet(f"{mount_path}/YEAR=2018/")
df_2019 = spark.read.parquet(f"{mount_path}/YEAR=2019/")
df_2020 = spark.read.parquet(f"{mount_path}/YEAR=2020/")
df_2021 = spark.read.parquet(f"{mount_path}/YEAR=2021/")

df_2015.createOrReplaceTempView("df_2015")
df_2016.createOrReplaceTempView("df_2016")
df_2017.createOrReplaceTempView("df_2017")
df_2018.createOrReplaceTempView("df_2018")
df_2019.createOrReplaceTempView("df_2019")
df_2020.createOrReplaceTempView("df_2020")
df_2021.createOrReplaceTempView("df_2021")

df_flights_weather_station = spark.sql("""
(select * from df_2015)
union all 
(select * from df_2016)
union all 
(select * from df_2017)
union all 
(select * from df_2018)
union all 
(select * from df_2019)
union all 
(select * from df_2020)
union all 
(select * from df_2021)
""")

df_flights_weather_station.createOrReplaceTempView("df_flights_weather_station")
df_flights_weather_station = spark.sql("""
select *,
cast(left(FL_DATE,4) AS INT) as YEAR
from df_flights_weather_station
""")



__Fill NA__

We're taking the mean value from the training data, and use that to fill the NA for the whole dataset including both training and test to avoid data leakage.

In [None]:
df_model = spark.sql("""
select 
 t1.DEP_DEL15,
 t1.MONTH,
 t1.DAY_OF_MONTH,
 t1.DAY_OF_WEEK,
 coalesce(t1.CRS_ELAPSED_TIME, t2.CRS_ELAPSED_TIME) as CRS_ELAPSED_TIME,
 t1.DISTANCE,
 t1.DISTANCE_GROUP,
 t1.YEAR,
 cast(t1.OP_CARRIER_FL_NUM as string) as OP_CARRIER_FL_NUM,
 t1.OP_UNIQUE_CARRIER,
 t1.TAIL_NUM,
 t1.ORIGIN,
 t1.ORIGIN_CITY_NAME,
 t1.ORIGIN_STATE_ABR,
 t1.DEST,
 t1.DEST_CITY_NAME,
 t1.DEST_STATE_ABR,
 t1.FL_DATE_IS_HOLIDAY,
 coalesce(t1.origin_weather_Avg_HourlyAltimeterSetting, t2.origin_weather_Avg_HourlyAltimeterSetting) as origin_weather_Avg_HourlyAltimeterSetting,
 coalesce(t1.origin_weather_Avg_HourlyDewPointTemperature, t2.origin_weather_Avg_HourlyDewPointTemperature) as origin_weather_Avg_HourlyDewPointTemperature,
 coalesce(t1.origin_weather_Avg_HourlyDryBulbTemperature, t2.origin_weather_Avg_HourlyDryBulbTemperature) as origin_weather_Avg_HourlyDryBulbTemperature,
 coalesce(t1.origin_weather_Avg_HourlyRelativeHumidity, t2.origin_weather_Avg_HourlyRelativeHumidity) as origin_weather_Avg_HourlyRelativeHumidity,
 coalesce(t1.origin_weather_Avg_HourlyStationPressure, t2.origin_weather_Avg_HourlyStationPressure) as origin_weather_Avg_HourlyStationPressure,
 coalesce(t1.origin_weather_Avg_HourlyVisibility, t2.origin_weather_Avg_HourlyVisibility) as origin_weather_Avg_HourlyVisibility,
 coalesce(t1.origin_weather_Avg_HourlyWetBulbTemperature, t2.origin_weather_Avg_HourlyWetBulbTemperature) as origin_weather_Avg_HourlyWetBulbTemperature,
 coalesce(t1.origin_weather_Avg_HourlyWindDirection, t2.origin_weather_Avg_HourlyWindDirection) as origin_weather_Avg_HourlyWindDirection,
 coalesce(t1.origin_weather_Avg_HourlyWindSpeed, t2.origin_weather_Avg_HourlyWindSpeed) as origin_weather_Avg_HourlyWindSpeed,
 coalesce(t1.dest_weather_Avg_HourlyAltimeterSetting, t2.dest_weather_Avg_HourlyAltimeterSetting) as dest_weather_Avg_HourlyAltimeterSetting,
 coalesce(t1.dest_weather_Avg_HourlyDewPointTemperature, t2.dest_weather_Avg_HourlyDewPointTemperature) as dest_weather_Avg_HourlyDewPointTemperature,
 coalesce(t1.dest_weather_Avg_HourlyDryBulbTemperature, t2.dest_weather_Avg_HourlyDryBulbTemperature) as dest_weather_Avg_HourlyDryBulbTemperature,
 coalesce(t1.dest_weather_Avg_HourlyRelativeHumidity, t2.dest_weather_Avg_HourlyRelativeHumidity) as dest_weather_Avg_HourlyRelativeHumidity,
 coalesce(t1.dest_weather_Avg_HourlyStationPressure, t2.dest_weather_Avg_HourlyStationPressure) as dest_weather_Avg_HourlyStationPressure,
 coalesce(t1.dest_weather_Avg_HourlyVisibility, t2.dest_weather_Avg_HourlyVisibility) as dest_weather_Avg_HourlyVisibility,
 coalesce(t1.dest_weather_Avg_HourlyWetBulbTemperature, t2.dest_weather_Avg_HourlyWetBulbTemperature) as dest_weather_Avg_HourlyWetBulbTemperature,
 coalesce(t1.dest_weather_Avg_HourlyWindDirection, t2.dest_weather_Avg_HourlyWindDirection) as dest_weather_Avg_HourlyWindDirection,
 coalesce(t1.dest_weather_Avg_HourlyWindSpeed, t2.dest_weather_Avg_HourlyWindSpeed) as dest_weather_Avg_HourlyWindSpeed,
 coalesce(t1.AIRPORT_ORIGIN_PAGERANK, t2.AIRPORT_ORIGIN_PAGERANK) as AIRPORT_ORIGIN_PAGERANK
from df_downsampled as t1, df_train_mean as t2
""")

##### 4.5.1.1 Training Data

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
from pyspark.sql.functions import udf, col
from scipy.sparse import csc_matrix

trainDF = df_model.where("year<=2020")

str_features = categorical_cols_keep[:-1]
str_features_output = [i+'_Index' for i in str_features]

ohe_features = ['FL_DATE_IS_HOLIDAY']
ohe_features_output = ['FL_DATE_IS_HOLIDAY_Index']

indexer = StringIndexer(inputCols=str_features, outputCols=str_features_output, handleInvalid = 'keep')
ohe_indexer = OneHotEncoder(inputCols=ohe_features, outputCols=ohe_features_output)

trainDF = indexer.fit(trainDF).transform(trainDF)
trainDF = ohe_indexer.fit(trainDF).transform(trainDF)

def dense_to_sparse(vector):
    return _convert_to_vector(csc_matrix(vector.toArray()).T)

inputs = indexer.getOutputCols()+ohe_indexer.getOutputCols()+numeric_cols_keep

assembler = VectorAssembler(inputCols=inputs, outputCol="features")
trainDF_VA = assembler.transform(trainDF)

to_sparse = udf(dense_to_sparse, VectorUDT())
trainDF_VA_sparse = trainDF_VA.withColumn("features", to_sparse(col("features")))

scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
trainDF_VA = scaler.fit(trainDF_VA_sparse).transform(trainDF_VA_sparse)

##### 4.5.1.2 Pearson Correlation Plot - Training Data 

Since we're going to start with logistic regression, we'd like to check the pearson correlation first before running the experiments.
<br>Based on the plots below, we do see a few variables are highly correlated, which are as expected.
<br>To improve the model performance, we'll keep only one of those highly correlated variables (like quarter & month), and focus on those variables that are highly positive or negative correlated with the response variables.

In [None]:
matrix = Correlation.corr(trainDF_VA, "features_scaled")
# matrix.display()
mtx = matrix.collect()[0][0]
corrmatrix = mtx.toArray().tolist()
# print(corrmatrix)
fig, ax = plt.subplots(figsize=(100,100)) 
sns.heatmap(corrmatrix, annot=True, fmt=".4g", cmap='viridis', ax=ax)

<img src="https://user-images.githubusercontent.com/88794396/204198035-3b71188d-5e46-4376-a0b2-f9b7e977427b.png?raw=true>" width=65%>

##### 4.5.1.3 Holdout Data

In [None]:
heldOutDF = df_model.where("year==2021")

heldOutDF = indexer.fit(heldOutDF).transform(heldOutDF)
heldOutDF = ohe_indexer.fit(heldOutDF).transform(heldOutDF)

heldOutDF_VA = assembler.transform(heldOutDF)
heldOutDF_VA_sparse = heldOutDF_VA.withColumn("features", to_sparse(col("features")))

heldOutDF_VA = scaler.fit(trainDF_VA_sparse).transform(heldOutDF_VA_sparse)

##### 4.5.1.4 Checkpoint

In [None]:
trainDF_VA.write.mode("overwrite").parquet(f"{blob_url}/trainDF_VA")
heldOutDF_VA.write.mode("overwrite").parquet(f"{blob_url}/heldOutDF_VA")

##### 4.5.1.5 Prepare Data for Modelling

In [None]:
import os
import sys
import itertools
from multiprocessing.pool import ThreadPool

import numpy as np

from pyspark import keyword_only, since, SparkContext, inheritable_thread_target
from pyspark.ml import Estimator, Transformer, Model
from pyspark.ml.common import inherit_doc, _py2java, _java2py
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasCollectSubModels, HasParallelism, HasSeed
from pyspark.ml.util import DefaultParamsReader, DefaultParamsWriter, MetaAlgorithmReadWrite, \
    MLReadable, MLReader, MLWritable, MLWriter, JavaMLReader, JavaMLWriter
from pyspark.ml.wrapper import JavaParams, JavaEstimator, JavaWrapper
from pyspark.sql.functions import col, lit, rand, UserDefinedFunction
from pyspark.sql.types import BooleanType

__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
           'TrainValidationSplitModel']


def _parallelFitTasks(est, train, eva, validation, epm, collectSubModel=False):
    """
    Creates a list of callables which can be called from different threads to fit and evaluate
    an estimator in parallel. Each callable returns an `(index, metric)` pair.

    Parameters
    ----------
    est : :py:class:`pyspark.ml.baseEstimator`
        he estimator to be fit.
    train : :py:class:`pyspark.sql.DataFrame`
        DataFrame, training data set, used for fitting.
    eva : :py:class:`pyspark.ml.evaluation.Evaluator`
        used to compute `metric`
    validation : :py:class:`pyspark.sql.DataFrame`
        DataFrame, validation data set, used for evaluation.
    epm : :py:class:`collections.abc.Sequence`
        Sequence of ParamMap, params maps to be used during fitting & evaluation.
    collectSubModel : bool
        Whether to collect sub model.

    Returns
    -------
    tuple
        (int, float, subModel), an index into `epm` and the associated metric value.
    """
    modelIter = est.fitMultiple(train, epm)

    def singleTask():
        index, model = next(modelIter)
        # TODO: duplicate evaluator to take extra params from input
        #  Note: Supporting tuning params in evaluator need update method
        #  `MetaAlgorithmReadWrite.getAllNestedStages`, make it return
        #  all nested stages and evaluators
        metric = eva.evaluate(model.transform(validation, epm[index]))
        return index, metric, model if collectSubModel else None

    return [singleTask] * len(epm)


class ParamGridBuilder(object):
    r"""
    Builder for a param grid used in grid search-based model selection.


    .. versionadded:: 1.4.0

    Examples
    --------
    >>> from pyspark.ml.classification import LogisticRegression
    >>> lr = LogisticRegression()
    >>> output = ParamGridBuilder() \
    ...     .baseOn({lr.labelCol: 'l'}) \
    ...     .baseOn([lr.predictionCol, 'p']) \
    ...     .addGrid(lr.regParam, [1.0, 2.0]) \
    ...     .addGrid(lr.maxIter, [1, 5]) \
    ...     .build()
    >>> expected = [
    ...     {lr.regParam: 1.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
    ...     {lr.regParam: 2.0, lr.maxIter: 1, lr.labelCol: 'l', lr.predictionCol: 'p'},
    ...     {lr.regParam: 1.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'},
    ...     {lr.regParam: 2.0, lr.maxIter: 5, lr.labelCol: 'l', lr.predictionCol: 'p'}]
    >>> len(output) == len(expected)
    True
    >>> all([m in expected for m in output])
    True
    """

    def __init__(self):
        self._param_grid = {}

    @since("1.4.0")
    def addGrid(self, param, values):
        """
        Sets the given parameters in this grid to fixed values.

        param must be an instance of Param associated with an instance of Params
        (such as Estimator or Transformer).
        """
        if isinstance(param, Param):
            self._param_grid[param] = values
        else:
            raise TypeError("param must be an instance of Param")

        return self


    @since("1.4.0")
    def baseOn(self, *args):
        """
        Sets the given parameters in this grid to fixed values.
        Accepts either a parameter dictionary or a list of (parameter, value) pairs.
        """
        if isinstance(args[0], dict):
            self.baseOn(*args[0].items())
        else:
            for (param, value) in args:
                self.addGrid(param, [value])

        return self


    @since("1.4.0")
    def build(self):
        """
        Builds and returns all combinations of parameters specified
        by the param grid.
        """
        keys = self._param_grid.keys()
        grid_values = self._param_grid.values()

        def to_key_value_pairs(keys, values):
            return [(key, key.typeConverter(value)) for key, value in zip(keys, values)]

        return [dict(to_key_value_pairs(keys, prod)) for prod in itertools.product(*grid_values)]



class _ValidatorParams(HasSeed):
    """
    Common params for TrainValidationSplit and CrossValidator.
    """

    estimator = Param(Params._dummy(), "estimator", "estimator to be cross-validated")
    estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps")
    evaluator = Param(
        Params._dummy(), "evaluator",
        "evaluator used to select hyper-parameters that maximize the validator metric")

    @since("2.0.0")
    def getEstimator(self):
        """
        Gets the value of estimator or its default value.
        """
        return self.getOrDefault(self.estimator)

    @since("2.0.0")
    def getEstimatorParamMaps(self):
        """
        Gets the value of estimatorParamMaps or its default value.
        """
        return self.getOrDefault(self.estimatorParamMaps)

    @since("2.0.0")
    def getEvaluator(self):
        """
        Gets the value of evaluator or its default value.
        """
        return self.getOrDefault(self.evaluator)

    @classmethod
    def _from_java_impl(cls, java_stage):
        """
        Return Python estimator, estimatorParamMaps, and evaluator from a Java ValidatorParams.
        """

        # Load information from java_stage to the instance.
        estimator = JavaParams._from_java(java_stage.getEstimator())
        evaluator = JavaParams._from_java(java_stage.getEvaluator())
        if isinstance(estimator, JavaEstimator):
            epms = [estimator._transfer_param_map_from_java(epm)
                    for epm in java_stage.getEstimatorParamMaps()]
        elif MetaAlgorithmReadWrite.isMetaEstimator(estimator):
            # Meta estimator such as Pipeline, OneVsRest
            epms = _ValidatorSharedReadWrite.meta_estimator_transfer_param_maps_from_java(
                estimator, java_stage.getEstimatorParamMaps())
        else:
            raise ValueError('Unsupported estimator used in tuning: ' + str(estimator))

        return estimator, epms, evaluator

    def _to_java_impl(self):
        """
        Return Java estimator, estimatorParamMaps, and evaluator from this Python instance.
        """

        gateway = SparkContext._gateway
        cls = SparkContext._jvm.org.apache.spark.ml.param.ParamMap

        estimator = self.getEstimator()
        if isinstance(estimator, JavaEstimator):
            java_epms = gateway.new_array(cls, len(self.getEstimatorParamMaps()))
            for idx, epm in enumerate(self.getEstimatorParamMaps()):
                java_epms[idx] = self.getEstimator()._transfer_param_map_to_java(epm)
        elif MetaAlgorithmReadWrite.isMetaEstimator(estimator):
            # Meta estimator such as Pipeline, OneVsRest
            java_epms = _ValidatorSharedReadWrite.meta_estimator_transfer_param_maps_to_java(
                estimator, self.getEstimatorParamMaps())
        else:
            raise ValueError('Unsupported estimator used in tuning: ' + str(estimator))

        java_estimator = self.getEstimator()._to_java()
        java_evaluator = self.getEvaluator()._to_java()
        return java_estimator, java_epms, java_evaluator


class _ValidatorSharedReadWrite:

    @staticmethod
    def meta_estimator_transfer_param_maps_to_java(pyEstimator, pyParamMaps):
        pyStages = MetaAlgorithmReadWrite.getAllNestedStages(pyEstimator)
        stagePairs = list(map(lambda stage: (stage, stage._to_java()), pyStages))
        sc = SparkContext._active_spark_context

        paramMapCls = SparkContext._jvm.org.apache.spark.ml.param.ParamMap
        javaParamMaps = SparkContext._gateway.new_array(paramMapCls, len(pyParamMaps))

        for idx, pyParamMap in enumerate(pyParamMaps):
            javaParamMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
            for pyParam, pyValue in pyParamMap.items():
                javaParam = None
                for pyStage, javaStage in stagePairs:
                    if pyStage._testOwnParam(pyParam.parent, pyParam.name):
                        javaParam = javaStage.getParam(pyParam.name)
                        break
                if javaParam is None:
                    raise ValueError('Resolve param in estimatorParamMaps failed: ' + str(pyParam))
                if isinstance(pyValue, Params) and hasattr(pyValue, "_to_java"):
                    javaValue = pyValue._to_java()
                else:
                    javaValue = _py2java(sc, pyValue)
                pair = javaParam.w(javaValue)
                javaParamMap.put([pair])
            javaParamMaps[idx] = javaParamMap
        return javaParamMaps

    @staticmethod
    def meta_estimator_transfer_param_maps_from_java(pyEstimator, javaParamMaps):
        pyStages = MetaAlgorithmReadWrite.getAllNestedStages(pyEstimator)
        stagePairs = list(map(lambda stage: (stage, stage._to_java()), pyStages))
        sc = SparkContext._active_spark_context
        pyParamMaps = []
        for javaParamMap in javaParamMaps:
            pyParamMap = dict()
            for javaPair in javaParamMap.toList():
                javaParam = javaPair.param()
                pyParam = None
                for pyStage, javaStage in stagePairs:
                    if pyStage._testOwnParam(javaParam.parent(), javaParam.name()):
                        pyParam = pyStage.getParam(javaParam.name())
                if pyParam is None:
                    raise ValueError('Resolve param in estimatorParamMaps failed: ' +
                                     javaParam.parent() + '.' + javaParam.name())
                javaValue = javaPair.value()
                if sc._jvm.Class.forName("org.apache.spark.ml.util.DefaultParamsWritable") \
                        .isInstance(javaValue):
                    pyValue = JavaParams._from_java(javaValue)
                else:
                    pyValue = _java2py(sc, javaValue)
                pyParamMap[pyParam] = pyValue
            pyParamMaps.append(pyParamMap)
        return pyParamMaps

    @staticmethod
    def is_java_convertible(instance):
        allNestedStages = MetaAlgorithmReadWrite.getAllNestedStages(instance.getEstimator())
        evaluator_convertible = isinstance(instance.getEvaluator(), JavaParams)
        estimator_convertible = all(map(lambda stage: hasattr(stage, '_to_java'), allNestedStages))
        return estimator_convertible and evaluator_convertible

    @staticmethod
    def saveImpl(path, instance, sc, extraMetadata=None):
        numParamsNotJson = 0
        jsonEstimatorParamMaps = []
        for paramMap in instance.getEstimatorParamMaps():
            jsonParamMap = []
            for p, v in paramMap.items():
                jsonParam = {'parent': p.parent, 'name': p.name}
                if (isinstance(v, Estimator) and not MetaAlgorithmReadWrite.isMetaEstimator(v)) \
                        or isinstance(v, Transformer) or isinstance(v, Evaluator):
                    relative_path = f'epm_{p.name}{numParamsNotJson}'
                    param_path = os.path.join(path, relative_path)
                    numParamsNotJson += 1
                    v.save(param_path)
                    jsonParam['value'] = relative_path
                    jsonParam['isJson'] = False
                elif isinstance(v, MLWritable):
                    raise RuntimeError(
                        "ValidatorSharedReadWrite.saveImpl does not handle parameters of type: "
                        "MLWritable that are not Estimaor/Evaluator/Transformer, and if parameter "
                        "is estimator, it cannot be meta estimator such as Validator or OneVsRest")
                else:
                    jsonParam['value'] = v
                    jsonParam['isJson'] = True
                jsonParamMap.append(jsonParam)
            jsonEstimatorParamMaps.append(jsonParamMap)

        skipParams = ['estimator', 'evaluator', 'estimatorParamMaps']
        jsonParams = DefaultParamsWriter.extractJsonParams(instance, skipParams)
        jsonParams['estimatorParamMaps'] = jsonEstimatorParamMaps

        DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata, jsonParams)
        evaluatorPath = os.path.join(path, 'evaluator')
        instance.getEvaluator().save(evaluatorPath)
        estimatorPath = os.path.join(path, 'estimator')
        instance.getEstimator().save(estimatorPath)

    @staticmethod
    def load(path, sc, metadata):
        evaluatorPath = os.path.join(path, 'evaluator')
        evaluator = DefaultParamsReader.loadParamsInstance(evaluatorPath, sc)
        estimatorPath = os.path.join(path, 'estimator')
        estimator = DefaultParamsReader.loadParamsInstance(estimatorPath, sc)

        uidToParams = MetaAlgorithmReadWrite.getUidMap(estimator)
        uidToParams[evaluator.uid] = evaluator

        jsonEstimatorParamMaps = metadata['paramMap']['estimatorParamMaps']

        estimatorParamMaps = []
        for jsonParamMap in jsonEstimatorParamMaps:
            paramMap = {}
            for jsonParam in jsonParamMap:
                est = uidToParams[jsonParam['parent']]
                param = getattr(est, jsonParam['name'])
                if 'isJson' not in jsonParam or ('isJson' in jsonParam and jsonParam['isJson']):
                    value = jsonParam['value']
                else:
                    relativePath = jsonParam['value']
                    valueSavedPath = os.path.join(path, relativePath)
                    value = DefaultParamsReader.loadParamsInstance(valueSavedPath, sc)
                paramMap[param] = value
            estimatorParamMaps.append(paramMap)

        return metadata, estimator, evaluator, estimatorParamMaps

    @staticmethod
    def validateParams(instance):
        estiamtor = instance.getEstimator()
        evaluator = instance.getEvaluator()
        uidMap = MetaAlgorithmReadWrite.getUidMap(estiamtor)

        for elem in [evaluator] + list(uidMap.values()):
            if not isinstance(elem, MLWritable):
                raise ValueError(f'Validator write will fail because it contains {elem.uid} '
                                 f'which is not writable.')

        estimatorParamMaps = instance.getEstimatorParamMaps()
        paramErr = 'Validator save requires all Params in estimatorParamMaps to apply to ' \
                   f'its Estimator, An extraneous Param was found: '
        for paramMap in estimatorParamMaps:
            for param in paramMap:
                if param.parent not in uidMap:
                    raise ValueError(paramErr + repr(param))

    @staticmethod
    def getValidatorModelWriterPersistSubModelsParam(writer):
        if 'persistsubmodels' in writer.optionMap:
            persistSubModelsParam = writer.optionMap['persistsubmodels'].lower()
            if persistSubModelsParam == 'true':
                return True
            elif persistSubModelsParam == 'false':
                return False
            else:
                raise ValueError(
                    f'persistSubModels option value {persistSubModelsParam} is invalid, '
                    f"the possible values are True, 'True' or False, 'False'")
        else:
            return writer.instance.subModels is not None


_save_with_persist_submodels_no_submodels_found_err = \
    'When persisting tuning models, you can only set persistSubModels to true if the tuning ' \
    'was done with collectSubModels set to true. To save the sub-models, try rerunning fitting ' \
    'with collectSubModels set to true.'


@inherit_doc
class CrossValidatorReader(MLReader):

    def __init__(self, cls):
        super(CrossValidatorReader, self).__init__()
        self.cls = cls

    def load(self, path):
        metadata = DefaultParamsReader.loadMetadata(path, self.sc)
        if not DefaultParamsReader.isPythonParamsInstance(metadata):
            return JavaMLReader(self.cls).load(path)
        else:
            metadata, estimator, evaluator, estimatorParamMaps = \
                _ValidatorSharedReadWrite.load(path, self.sc, metadata)
            cv = CrossValidator(estimator=estimator,
                                estimatorParamMaps=estimatorParamMaps,
                                evaluator=evaluator)
            cv = cv._resetUid(metadata['uid'])
            DefaultParamsReader.getAndSetParams(cv, metadata, skipParams=['estimatorParamMaps'])
            return cv


@inherit_doc
class CrossValidatorWriter(MLWriter):

    def __init__(self, instance):
        super(CrossValidatorWriter, self).__init__()
        self.instance = instance

    def saveImpl(self, path):
        _ValidatorSharedReadWrite.validateParams(self.instance)
        _ValidatorSharedReadWrite.saveImpl(path, self.instance, self.sc)


@inherit_doc
class CrossValidatorModelReader(MLReader):

    def __init__(self, cls):
        super(CrossValidatorModelReader, self).__init__()
        self.cls = cls

    def load(self, path):
        metadata = DefaultParamsReader.loadMetadata(path, self.sc)
        if not DefaultParamsReader.isPythonParamsInstance(metadata):
            return JavaMLReader(self.cls).load(path)
        else:
            metadata, estimator, evaluator, estimatorParamMaps = \
                _ValidatorSharedReadWrite.load(path, self.sc, metadata)
            numFolds = metadata['paramMap']['numFolds']
            bestModelPath = os.path.join(path, 'bestModel')
            bestModel = DefaultParamsReader.loadParamsInstance(bestModelPath, self.sc)
            avgMetrics = metadata['avgMetrics']
            persistSubModels = ('persistSubModels' in metadata) and metadata['persistSubModels']

            if persistSubModels:
                subModels = [[None] * len(estimatorParamMaps)] * numFolds
                for splitIndex in range(numFolds):
                    for paramIndex in range(len(estimatorParamMaps)):
                        modelPath = os.path.join(
                            path, 'subModels', f'fold{splitIndex}', f'{paramIndex}')
                        subModels[splitIndex][paramIndex] = \
                            DefaultParamsReader.loadParamsInstance(modelPath, self.sc)
            else:
                subModels = None

            cvModel = CrossValidatorModel(bestModel, avgMetrics=avgMetrics, subModels=subModels)
            cvModel = cvModel._resetUid(metadata['uid'])
            cvModel.set(cvModel.estimator, estimator)
            cvModel.set(cvModel.estimatorParamMaps, estimatorParamMaps)
            cvModel.set(cvModel.evaluator, evaluator)
            DefaultParamsReader.getAndSetParams(
                cvModel, metadata, skipParams=['estimatorParamMaps'])
            return cvModel


@inherit_doc
class CrossValidatorModelWriter(MLWriter):

    def __init__(self, instance):
        super(CrossValidatorModelWriter, self).__init__()
        self.instance = instance

    def saveImpl(self, path):
        _ValidatorSharedReadWrite.validateParams(self.instance)
        instance = self.instance
        persistSubModels = _ValidatorSharedReadWrite \
            .getValidatorModelWriterPersistSubModelsParam(self)
        extraMetadata = {'avgMetrics': instance.avgMetrics,
                         'persistSubModels': persistSubModels}
        _ValidatorSharedReadWrite.saveImpl(path, instance, self.sc, extraMetadata=extraMetadata)
        bestModelPath = os.path.join(path, 'bestModel')
        instance.bestModel.save(bestModelPath)
        if persistSubModels:
            if instance.subModels is None:
                raise ValueError(_save_with_persist_submodels_no_submodels_found_err)
            subModelsPath = os.path.join(path, 'subModels')
            for splitIndex in range(instance.getNumFolds()):
                splitPath = os.path.join(subModelsPath, f'fold{splitIndex}')
                for paramIndex in range(len(instance.getEstimatorParamMaps())):
                    modelPath = os.path.join(splitPath, f'{paramIndex}')
                    instance.subModels[splitIndex][paramIndex].save(modelPath)


class _CrossValidatorParams(_ValidatorParams):
    """
    Params for :py:class:`CrossValidator` and :py:class:`CrossValidatorModel`.

    .. versionadded:: 3.0.0
    """

    numFolds = Param(Params._dummy(), "numFolds", "number of folds for cross validation",
                     typeConverter=TypeConverters.toInt)

    foldCol = Param(Params._dummy(), "foldCol", "Param for the column name of user " +
                    "specified fold number. Once this is specified, :py:class:`CrossValidator` " +
                    "won't do random k-fold split. Note that this column should be integer type " +
                    "with range [0, numFolds) and Spark will throw exception on out-of-range " +
                    "fold numbers.", typeConverter=TypeConverters.toString)

    def __init__(self, *args):
        super(_CrossValidatorParams, self).__init__(*args)
        self._setDefault(numFolds=3, foldCol="")

    @since("1.4.0")
    def getNumFolds(self):
        """
        Gets the value of numFolds or its default value.
        """
        return self.getOrDefault(self.numFolds)

    @since("3.1.0")
    def getFoldCol(self):
        """
        Gets the value of foldCol or its default value.
        """
        return self.getOrDefault(self.foldCol)


class CustomCrossValidator(Estimator, _CrossValidatorParams, HasParallelism, HasCollectSubModels,
                     MLReadable, MLWritable):
    """
    Modifies CrossValidator allowing custom train and test dataset to be passed into the function
    Bypass generation of train/test via numFolds
    instead train and test set is user define
    """
    
    splitWord = Param(Params._dummy(), "splitWord", "Tuple to split train and test set e.g. ('train', 'test')",
                      typeConverter=TypeConverters.toListString)
    cvCol = Param(Params._dummy(), "cvCol", "Column name to filter train and test list",
                      typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, *, estimator=None, estimatorParamMaps=None, evaluator=None, seed=None, parallelism=1, collectSubModels=False, 
                 splitWord = ('train', 'test'), cvCol = 'cv'):
        """
        __init__(self, \\*, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
                 seed=None, parallelism=1, collectSubModels=False, foldCol="")
        """
        super(CustomCrossValidator, self).__init__()
        self._setDefault(parallelism=1)
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    @since("1.4.0")
    def setParams(self, *, estimator=None, estimatorParamMaps=None, evaluator=None, seed=None, parallelism=1, collectSubModels=False, 
                 splitWord = ('train', 'test'), cvCol = 'cv'):
        """
        Sets params for cross validator.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    @since("2.0.0")
    def setEstimator(self, value):
        """
        Sets the value of :py:attr:`estimator`.
        """
        return self._set(estimator=value)


    @since("2.0.0")
    def setEstimatorParamMaps(self, value):
        """
        Sets the value of :py:attr:`estimatorParamMaps`.
        """
        return self._set(estimatorParamMaps=value)


    @since("2.0.0")
    def setEvaluator(self, value):
        """
        Sets the value of :py:attr:`evaluator`.
        """
        return self._set(evaluator=value)


    @since("1.4.0")
    def setNumFolds(self, value):
        """
        Sets the value of :py:attr:`numFolds`.
        """
        return self._set(numFolds=value)


    @since("3.1.0")
    def setFoldCol(self, value):
        """
        Sets the value of :py:attr:`foldCol`.
        """
        return self._set(foldCol=value)


    def setSeed(self, value):
        """
        Sets the value of :py:attr:`seed`.
        """
        return self._set(seed=value)


    def setParallelism(self, value):
        """
        Sets the value of :py:attr:`parallelism`.
        """
        return self._set(parallelism=value)


    def setCollectSubModels(self, value):
        """
        Sets the value of :py:attr:`collectSubModels`.
        """
        return self._set(collectSubModels=value)


    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        nFolds = len(dataset)
        seed = self.getOrDefault(self.seed)
        metrics = [0.0] * numModels
        matrix_metrics = [[0 for x in range(nFolds)] for y in range(len(epm))]

        # pool = ThreadPool(processes=min(self.getParallelism(), numModels))
        if self.getParallelism() < numModels:
            min_value = self.getParallelism()
        else:
            min_value = numModels
        pool = ThreadPool(processes=min_value)

        for i in range(nFolds):
            validation = dataset[list(dataset.keys())[i]].filter(col(self.getOrDefault(self.cvCol))==(self.getOrDefault(self.splitWord))[0]).cache()
            train = dataset[list(dataset.keys())[i]].filter(col(self.getOrDefault(self.cvCol))==(self.getOrDefault(self.splitWord))[1]).cache()

            print('fold {} start...'.format(i+1))
            tasks = _parallelFitTasks(est, train, eva, validation, epm)
            for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
                #print(j, metric)
                matrix_metrics[j][i] = metric
                metrics[j] += (metric / nFolds)
            print('fold {} end'.format(i+1))
            #print(metrics)
            validation.unpersist()
            train.unpersist()

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

#         for i in range(len(metrics)):
#             print(epm[i], 'Detailed Score {}'.format(matrix_metrics[i]), 'Avg Score {}'.format(metrics[i]))

        print('Detailed Score {}'.format(matrix_metrics[bestIndex]),
              'Avg Score {}'.format(metrics[bestIndex]))

        ### Do not bother to train on full dataset, just the latest train supplied
        # bestModel = est.fit(dataset, epm[bestIndex])
        bestModel = est.fit(train, epm[bestIndex])
        return self._copyValues(CrossValidatorModel(bestModel, metrics))

    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies creates a deep copy of
        the embedded paramMap, and copies the embedded and extra parameters over.


        .. versionadded:: 1.4.0

        Parameters
        ----------
        extra : dict, optional
            Extra parameters to copy to the new instance

        Returns
        -------
        :py:class:`CrossValidator`
            Copy of this instance
        """
        if extra is None:
            extra = dict()
        newCV = Params.copy(self, extra)
        if self.isSet(self.estimator):
            newCV.setEstimator(self.getEstimator().copy(extra))
        # estimatorParamMaps remain the same
        if self.isSet(self.evaluator):
            newCV.setEvaluator(self.getEvaluator().copy(extra))
        return newCV


    @since("2.3.0")
    def write(self):
        """Returns an MLWriter instance for this ML instance."""
        if _ValidatorSharedReadWrite.is_java_convertible(self):
            return JavaMLWriter(self)
        return CrossValidatorWriter(self)


    @classmethod
    @since("2.3.0")
    def read(cls):
        """Returns an MLReader instance for this class."""
        return CrossValidatorReader(cls)


    @classmethod
    def _from_java(cls, java_stage):
        """
        Given a Java CrossValidator, create and return a Python wrapper of it.
        Used for ML persistence.
        """

        estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage)
        numFolds = java_stage.getNumFolds()
        seed = java_stage.getSeed()
        parallelism = java_stage.getParallelism()
        collectSubModels = java_stage.getCollectSubModels()
        foldCol = java_stage.getFoldCol()
        # Create a new instance of this stage.
        py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
                       numFolds=numFolds, seed=seed, parallelism=parallelism,
                       collectSubModels=collectSubModels, foldCol=foldCol)
        py_stage._resetUid(java_stage.uid())
        return py_stage

    def _to_java(self):
        """
        Transfer this instance to a Java CrossValidator. Used for ML persistence.

        Returns
        -------
        py4j.java_gateway.JavaObject
            Java object equivalent to this instance.
        """

        estimator, epms, evaluator = super(CrossValidator, self)._to_java_impl()

        _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidator", self.uid)
        _java_obj.setEstimatorParamMaps(epms)
        _java_obj.setEvaluator(evaluator)
        _java_obj.setEstimator(estimator)
        _java_obj.setSeed(self.getSeed())
        _java_obj.setNumFolds(self.getNumFolds())
        _java_obj.setParallelism(self.getParallelism())
        _java_obj.setCollectSubModels(self.getCollectSubModels())
        _java_obj.setFoldCol(self.getFoldCol())

        return _java_obj



class CrossValidatorModel(Model, _CrossValidatorParams, MLReadable, MLWritable):
    """

    CrossValidatorModel contains the model with the highest average cross-validation
    metric across folds and uses this model to transform input data. CrossValidatorModel
    also tracks the metrics for each param map evaluated.

    .. versionadded:: 1.4.0
    """

    def __init__(self, bestModel, avgMetrics=None, subModels=None):
        super(CrossValidatorModel, self).__init__()
        #: best model from cross validation
        self.bestModel = bestModel
        #: Average cross-validation metrics for each paramMap in
        #: CrossValidator.estimatorParamMaps, in the corresponding order.
        self.avgMetrics = avgMetrics or []
        #: sub model list from cross validation
        self.subModels = subModels

    def _transform(self, dataset):
        return self.bestModel.transform(dataset)

    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies the underlying bestModel,
        creates a deep copy of the embedded paramMap, and
        copies the embedded and extra parameters over.
        It does not copy the extra Params into the subModels.

        .. versionadded:: 1.4.0

        Parameters
        ----------
        extra : dict, optional
            Extra parameters to copy to the new instance

        Returns
        -------
        :py:class:`CrossValidatorModel`
            Copy of this instance
        """
        if extra is None:
            extra = dict()
        bestModel = self.bestModel.copy(extra)
        avgMetrics = list(self.avgMetrics)
        subModels = [
            [sub_model.copy() for sub_model in fold_sub_models]
            for fold_sub_models in self.subModels
        ]
        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra)


    @since("2.3.0")
    def write(self):
        """Returns an MLWriter instance for this ML instance."""
        if _ValidatorSharedReadWrite.is_java_convertible(self):
            return JavaMLWriter(self)
        return CrossValidatorModelWriter(self)


    @classmethod
    @since("2.3.0")
    def read(cls):
        """Returns an MLReader instance for this class."""
        return CrossValidatorModelReader(cls)


    @classmethod
    def _from_java(cls, java_stage):
        """
        Given a Java CrossValidatorModel, create and return a Python wrapper of it.
        Used for ML persistence.
        """
        sc = SparkContext._active_spark_context
        bestModel = JavaParams._from_java(java_stage.bestModel())
        avgMetrics = _java2py(sc, java_stage.avgMetrics())
        estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage)

        py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)
        params = {
            "evaluator": evaluator,
            "estimator": estimator,
            "estimatorParamMaps": epms,
            "numFolds": java_stage.getNumFolds(),
            "foldCol": java_stage.getFoldCol(),
            "seed": java_stage.getSeed(),
        }
        for param_name, param_val in params.items():
            py_stage = py_stage._set(**{param_name: param_val})

        if java_stage.hasSubModels():
            py_stage.subModels = [[JavaParams._from_java(sub_model)
                                   for sub_model in fold_sub_models]
                                  for fold_sub_models in java_stage.subModels()]

        py_stage._resetUid(java_stage.uid())
        return py_stage

    def _to_java(self):
        """
        Transfer this instance to a Java CrossValidatorModel. Used for ML persistence.

        Returns
        -------
        py4j.java_gateway.JavaObject
            Java object equivalent to this instance.
        """

        sc = SparkContext._active_spark_context
        _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidatorModel",
                                             self.uid,
                                             self.bestModel._to_java(),
                                             _py2java(sc, self.avgMetrics))
        estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl()

        params = {
            "evaluator": evaluator,
            "estimator": estimator,
            "estimatorParamMaps": epms,
            "numFolds": self.getNumFolds(),
            "foldCol": self.getFoldCol(),
            "seed": self.getSeed(),
        }
        for param_name, param_val in params.items():
            java_param = _java_obj.getParam(param_name)
            pair = java_param.w(param_val)
            _java_obj.set(pair)

        if self.subModels is not None:
            java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models]
                               for fold_sub_models in self.subModels]
            _java_obj.setSubModels(java_sub_models)
        return _java_obj

In [None]:
trainDF_VA = trainDF_VA.select(['DEP_DEL15','features_scaled', 'YEAR'])
trainDF_VA = trainDF_VA.withColumnRenamed("DEP_DEL15", "label")
trainDF_VA = trainDF_VA.withColumnRenamed("features_scaled", "features")

# create dictionary of dataframes for custom cv fn to loop through
# assign train and test based on time series split 

d = {}

d['df1'] = trainDF_VA.filter(trainDF_VA.YEAR <= 2016)\
                   .withColumn('cv', F.when(trainDF_VA.YEAR == 2015, 'train')
                                         .otherwise('test'))

d['df2'] = trainDF_VA.filter(F.col('YEAR').between(2015, 2017))\
                   .withColumn('cv', F.when(trainDF_VA.YEAR <= 2016, 'train')
                                         .otherwise('test'))

d['df3'] = trainDF_VA.filter(F.col('YEAR').between(2016, 2018))\
                   .withColumn('cv', F.when(trainDF_VA.YEAR <= 2017, 'train')
                                         .otherwise('test'))

d['df4'] = trainDF_VA.filter(F.col('YEAR').between(2017, 2019))\
                   .withColumn('cv', F.when(trainDF_VA.YEAR <= 2018, 'train')
                                         .otherwise('test'))

d['df5'] = trainDF_VA.filter(F.col('YEAR').between(2019, 2020))\
                   .withColumn('cv', F.when(trainDF_VA.YEAR <= 2019, 'train')
                                         .otherwise('test'))

heldOutDF_VA = heldOutDF_VA.select(['DEP_DEL15','features_scaled'])
heldOutDF_VA = heldOutDF_VA.withColumnRenamed("DEP_DEL15", "label")
heldOutDF_VA = heldOutDF_VA.withColumnRenamed("features_scaled", "features")

##### 4.5.2 Baseline

Across our entire data set, we see that 17.4% of flights are delayed more than 15 minutes from their scheduled departure.  We set as our baseline predictor a model that always predicts a delay.  We compute the precision and recall of this "Always Predict Delay" model, and will measure our logistic regression and other models against this baseline.

In [None]:
%sql

select
--year, month,
-- concat(cast(year as string), case when month >= 10 then cast(month as string) else concat('0',cast(month as string)) end) as year_month,
-- sum(case when dep_delay>15 then 1 else 0 end) as delay_cnt,
sum(case when dep_delay>15 then 1 else 0 end)/count(dep_delay) as delay_pct
from df_airlines_full_tb
where year <= 2018
and cancelled = 0;

delay_pct
0.174117145151199


For a given observed delay percentage D, assumed to be 17.4% here, these metrics for an "Always predict delay" strategy are computed as

Precision = $$\frac{17.4}{17.4 + (100-17.4)} = \frac{17.4}{100} = .174$$
Recall = $$\frac{17.4}{17.4 + 0} = 1$$
F1 = $$\frac{2 * 17.4}{2 * 17.4 + (100-17.4) + 0} = \frac{34.8}{117.4} = .296$$

We intend to update these metrics with our logistic regression model and other models in later phases of the project.

| Model | Precision | Recall | F1 |
|---|---|---|---|
| Always predict delay | .174 | 1.0 | .296 |
| Never predict delay  | 0 | 0 | 0 |

##### 4.5.3 Logistic Regression

We use pyspark.mk.classification.LogisticRegression to build a model based on our training data to predict membership of the "delay" or "no delay" class.  
To do this, we setup a pipeline that uses one-hot encoding to vectorize our categorical variables, and then use the VectorAssembler to prepare it for use by the LogisticRegression model.

In [None]:
lr = LogisticRegression()

pipeline = Pipeline(stages=[lr])
paramGrid = (ParamGridBuilder()
    .addGrid(lr.maxIter, [10, 15])
    .addGrid(lr.regParam, [0.3, 0.7])
    .addGrid(lr.elasticNetParam, [0.2, 0.8])
    .build())

evaluator = MulticlassClassificationEvaluator(metricName='fMeasureByLabel', beta=2)

cv = CustomCrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator,
     splitWord = ('train', 'test'), cvCol = 'cv', parallelism=10)

lr_cv_model = cv.fit(d)

# make predictions
lr_predictions = lr_cv_model.transform(heldOutDF_VA)

display(lr_predictions.groupby('label', 'prediction').count())

<img src="https://user-images.githubusercontent.com/88794396/204198075-7eac77da-4ce2-4136-9e94-eb372f3b27f5.png?raw=true>" width=65%>

##### 4.5.4 Random Forest

In [None]:
# set up grid search: estimator, set of params, and evaluator
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
grid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [1, 3, 5])
    .addGrid(rf.maxDepth, [1, 10, 20])
    .addGrid(rf.maxBins, [60, 100, 500])
    .build())

evaluator = BinaryClassificationEvaluator()

# run cross validation & return the crossvalidation AUROC for 'test' set
cv = CustomCrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator,
     splitWord = ('train', 'test'), cvCol = 'cv', parallelism=10)

rf_cv_model = cv.fit(d)

# make predictions
rf_predictions = rf_cv_model.transform(heldOutDF_VA)

display(rf_predictions.groupby('label', 'prediction').count())

fold 1 start...
fold 1 end
fold 2 start...
fold 2 end
fold 3 start...
fold 3 end
fold 4 start...


<img src="https://user-images.githubusercontent.com/88794396/204198057-23057d2a-07bd-4245-8562-9b92adf4fdd8.png?raw=true>" width=65%>

### 5. Results

|   | Baseline Experiment | Train | Test | Metrics - Precision | Metrics - Recall | Metrics - F1 | Metrics - F2 |
|---|---|---|---|---|---|---|---|
| Baseline | Always Predict Delay | 14,272,964 | 3,504,568 | 0.174 | 1.0 | 0.296 | 0.513 |
| Baseline | Always Predict NoDelay | 14,272,964 | 3,504,568 | 0.0 | 0.0 | 0.0 | 0.0 |
| Experiment 1 | Logistic Regression | 13,377,810 | 2,221,195 | 0.504 | 0.854 | 0.634 | 0.750 |
| Experiment 2 | Random Forest | 13,377,810 | 2,221,195 | 0.0 | 0.0 | 0.0 | 0.0 |


<br>

__Metrics__

Precision and recall are defined in terms of True positive (TP), False positive (FP), True Negative (TN), and False Negative (FN) classifications.

Precision = $$\frac{TP}{TP+FP}$$

Recall = $$\frac{TP}{TP+FN}$$

F1 = $$\frac{2TP}{2TP+FP+FN}$$


<br>
Assume we care more for FN, which means we weight recall higher than precision; therefore, we choose beta as 2.

F_beta = $$\frac{(1+\beta^2)TP}{(1+\beta^2)TP + \beta^2FN + FP}$$

F2 = $$\frac{5TP}{5TP + 4FN + FP}$$

### 6. Gap Analysis

___Current State___

True positives and false negatives are important to the business problem because the former represents successful flight delay detection and the latter represents undetected delays; both have an impact on whether the customers miss their flight, which has other financial implications to the airline company. By nature of this, we are evaluating precision and recall, with more of an emphasis on recall, and therefore, the F2 score (or F_beta, where beta = 2).

As of Phase 3, the logistic regression model produces the best result. The Random Forest model did not work as expected. This is due to setting the evaluator to F_beta (beta = 2), causing the model to only predict negative results. 

According to the leaderboard, many teams that are using precision-recall scored between 0.6 and 0.8 for both metrics. The highest observed scores from other teams are 0.93 for weighted precision and weighted recall. Given that recall is more relevant for our team's goal, our current model can be considered reliable and we are also in good standing when benchmarked against our peers.

___Future State___

For our next experiment, we plan to use the default binary classification evaluator which tunes the parameters based on AUROC - the model's ability to differentiate between 2 classes. This will allow us to produce a model that can successfully evaluate both positive and negative predictions and potentially allow us to improve upon the results from logistic regression.

### 7. Conclusion

<br>___Project Focus___

The focus of our project is to accurately predict a flight delay by analyzing flight and weather data from 2015-2021. This analysis is important in order to proactively account for these delays and plan accordingly, allowing us to reduce costs associated with rerouting flights and the impact on our passengers.

<br>___Hypothesis___

Our main hypothesis states that a combination of weather conditions two hours before flight departure and route features can accurately forecast a flight delay greater than 15 minutes. 


<br>___Results___

From our simple baseline predictor model, which was set to always predict a delay, we can observe that 17.4% of flights are delayed more than 15 minutes. 

Our preliminary results, which were produced with a simplified logistic regression model using only the Month feature, show that each month has a delayed flight percentage of 5-25%. These values show that we must add additional features to our model to improve the predictive performance.

In our new experiment, we enhanced our logistic regression model by using revised training data that has been downsampled and utilizes more features - we now have a balanced dataset with a new time feature (holiday) and graph feature (pagerank) added. This yielded significantly improved results. After hyperparameter tuning, our best model produced an F2 score as high as 0.75

<br>___Future / Thoughts___

For our next steps, we will work on conducting more experiments, which include the following: performing regularization on our current features, adjusting our evaluator for Random Forest, hyperparameter tuning and analyzing the results.
Finally, we will improve our predictive power of our selected model.

### 8. Project Timeline

<img src="https://github.com/carla-cortez/261/blob/main/gantt_diagram.png?raw=true>" width=75%>