In [0]:
# imports
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
import numpy as np
import time

from datetime import date
import pandas as pd
import holidays
from pandas.tseries.offsets import DateOffset

In [0]:
# set up storage
blob_container = "container1" # The name of your container created in https://portal.azure.com
storage_account = "w261sec6group3" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261sec6group3_scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261sec6group3_key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# read joined dataset from phase 2
df_final = spark.read.parquet(f"{blob_url}/final_dataset_2022")

In [0]:
display(df_final)

quarter,month,day_of_month,day_of_week,fl_date,op_unique_carrier,op_carrier_airline_id,op_carrier,tail_num,op_carrier_fl_num,origin_airport_id,origin_airport_seq_id,origin_city_market_id,origin,origin_city_name,origin_state_abr,origin_state_fips,origin_state_nm,origin_wac,dest_airport_id,dest_airport_seq_id,dest_city_market_id,dest,dest_city_name,dest_state_abr,dest_state_fips,dest_state_nm,dest_wac,crs_dep_time,dep_time,dep_delay,dep_delay_new,dep_del15,dep_delay_group,dep_time_blk,taxi_out,wheels_off,wheels_on,taxi_in,crs_arr_time,arr_time,arr_delay,arr_delay_new,arr_del15,arr_delay_group,arr_time_blk,cancelled,diverted,crs_elapsed_time,actual_elapsed_time,air_time,flights,distance,distance_group,year,IATA,station_id,name,icao,lat,lon,altitude,station_lon,station_lat,timezone,type,crs_dep_hour,flight_hour,flight_time,flight_time_utc,flight_time_utc_hour,flight_time_utc_year,flight_time_utc_month,flight_time_utc_date,is_holiday,weather_station,weather_station_name,metar_hour,avg_hourly_dew_point_temp,avg_hourly_dry_bulb_temp,avg_hourly_relative_humidity,avg_hourly_station_pressure,avg_hourly_visibility,avg_hourly_wind_direction,avg_hourly_wind_speed,avg_hourly_precipitation_ordinal,pagerank
1,2,10,4,2022-02-10,G4,20368,G4,202NV,2322,10676,1067602,30676,BLV,"Belleville, IL",IL,17,Illinois,41,14761,1476107,34761,SFB,"Sanford, FL",FL,12,Florida,33,815,758,-17.0,0.0,0.0,-2,0800-0859,9.0,807,1055.0,4.0,1123,1059.0,-24.0,0.0,0.0,-2.0,1100-1159,0.0,0.0,128.0,121.0,108.0,1.0,835.0,4,2022,BLV,72433813802,Scott AFB/Midamerica Airport,KBLV,38.5452,-89.835197,459,-89.85,38.55,America/Chicago,large_airport,8.0,815,2022-02-10T08:15:00.000+0000,2022-02-10T14:15:00.000+0000,14,2022,2,10,0,72433813802,"BELLEVILLE SCOTT AFB, IL US",2022-02-10T11:00:00.000+0000,27.0,44.0,52.0,29.56999969482422,10.0,250.0,8.0,0.0,0.4886393716979927
1,2,17,4,2022-02-17,G4,20368,G4,202NV,2479,11423,1142307,31423,DSM,"Des Moines, IA",IA,19,Iowa,61,14761,1476107,34761,SFB,"Sanford, FL",FL,12,Florida,33,900,844,-16.0,0.0,0.0,-2,0900-0959,13.0,857,1226.0,5.0,1235,1231.0,-4.0,0.0,0.0,-1.0,1200-1259,0.0,0.0,155.0,167.0,149.0,1.0,1124.0,5,2022,DSM,72546014933,Des Moines International Airport,KDSM,41.53400039672852,-93.66310119628906,958,-93.653,41.534,America/Chicago,large_airport,9.0,900,2022-02-17T09:00:00.000+0000,2022-02-17T15:00:00.000+0000,15,2022,2,17,0,72546014933,"DES MOINES INTERNATIONAL AIRPORT, IA US",2022-02-17T12:00:00.000+0000,-1.0,14.0,51.0,29.149999618530277,10.0,360.0,20.0,0.0,1.2973277850148917
1,2,17,4,2022-02-17,G4,20368,G4,202NV,2400,13871,1387102,33316,OMA,"Omaha, NE",NE,31,Nebraska,65,14761,1476107,34761,SFB,"Sanford, FL",FL,12,Florida,33,1914,1909,-5.0,0.0,0.0,-1,1900-1959,13.0,1922,2246.0,7.0,2302,2253.0,-9.0,0.0,0.0,-1.0,2300-2359,0.0,0.0,168.0,164.0,144.0,1.0,1195.0,5,2022,OMA,72550014942,Eppley Airfield,KOMA,41.3032,-95.894096,984,-95.899,41.31,America/Chicago,large_airport,19.0,1914,2022-02-17T19:14:00.000+0000,2022-02-18T01:14:00.000+0000,1,2022,2,18,1,72550014942,"OMAHA EPPLEY AIRFIELD, NE US",2022-02-17T22:00:00.000+0000,2.0,16.0,54.0,29.1299991607666,10.0,160.0,8.0,0.0,1.3371360153936729
1,2,27,7,2022-02-27,G4,20368,G4,202NV,2564,12917,1291703,31066,LCK,"Columbus, OH",OH,39,Ohio,44,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,1847,1832,-15.0,0.0,0.0,-1,1800-1859,11.0,1843,2039.0,5.0,2106,2044.0,-22.0,0.0,0.0,-2.0,2100-2159,0.0,0.0,139.0,132.0,116.0,1.0,856.0,4,2022,LCK,72428513812,Rickenbacker International Airport,KLCK,39.813801,-82.927803,744,-82.933,39.817,America/New_York,large_airport,18.0,1847,2022-02-27T18:47:00.000+0000,2022-02-27T23:47:00.000+0000,23,2022,2,27,0,72428513812,"COLUMBUS RICKENBACKER, OH US",2022-02-27T20:00:00.000+0000,24.0,36.0,62.0,29.3799991607666,10.0,270.0,5.0,0.0,0.3549416968201481
1,3,2,3,2022-03-02,G4,20368,G4,202NV,2541,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,14512,1451202,34512,RFD,"Rockford, IL",IL,17,Illinois,41,1602,1637,35.0,35.0,1.0,2,1600-1659,23.0,1700,1830.0,5.0,1753,1835.0,42.0,42.0,1.0,2.0,1700-1759,0.0,0.0,171.0,178.0,150.0,1.0,1085.0,5,2022,SRQ,72211512871,Sarasota Bradenton International Airport,KSRQ,27.39539909362793,-82.55439758300781,30,-82.559,27.401,America/New_York,large_airport,16.0,1602,2022-03-02T16:02:00.000+0000,2022-03-02T21:02:00.000+0000,21,2022,3,2,0,72211512871,"SARASOTA BRADENTON AIRPORT, FL US",2022-03-02T18:00:00.000+0000,50.0,67.0,55.0,30.06999969482422,10.0,340.0,7.0,0.0,2.280104758807622
1,3,2,3,2022-03-02,G4,20368,G4,202NV,2541,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,14512,1451202,34512,RFD,"Rockford, IL",IL,17,Illinois,41,1602,1637,35.0,35.0,1.0,2,1600-1659,23.0,1700,1830.0,5.0,1753,1835.0,42.0,42.0,1.0,2.0,1700-1759,0.0,0.0,171.0,178.0,150.0,1.0,1085.0,5,2022,SRQ,72211512871,Sarasota Bradenton International Airport,KSRQ,27.39539909362793,-82.55439758300781,30,-82.559,27.401,America/New_York,large_airport,16.0,1602,2022-03-02T16:02:00.000+0000,2022-03-02T21:02:00.000+0000,21,2022,3,2,0,72211512871,"SARASOTA BRADENTON AIRPORT, FL US",2022-03-02T18:00:00.000+0000,50.0,67.0,55.0,30.06999969482422,10.0,340.0,7.0,0.0,2.280104758807622
1,3,4,5,2022-03-04,G4,20368,G4,202NV,2557,14696,1469608,34696,SBN,"South Bend, IN",IN,18,Indiana,42,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,1941,1932,-9.0,0.0,0.0,-1,1900-1959,9.0,1941,2138.0,6.0,2208,2144.0,-24.0,0.0,0.0,-2.0,2200-2259,0.0,0.0,147.0,132.0,117.0,1.0,1009.0,5,2022,SBN,72535014848,South Bend Regional Airport,KSBN,41.70869827270508,-86.31729888916016,799,-86.316,41.707,America/Indiana/Indianapolis,large_airport,19.0,1941,2022-03-04T19:41:00.000+0000,2022-03-05T00:41:00.000+0000,0,2022,3,5,0,72535014848,"SOUTH BEND AIRPORT, IN US",2022-03-04T21:00:00.000+0000,19.0,37.0,48.0,29.40999984741211,10.0,120.0,14.0,0.0,0.6917703135198011
1,3,4,5,2022-03-04,G4,20368,G4,202NV,2557,14696,1469608,34696,SBN,"South Bend, IN",IN,18,Indiana,42,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,1941,1932,-9.0,0.0,0.0,-1,1900-1959,9.0,1941,2138.0,6.0,2208,2144.0,-24.0,0.0,0.0,-2.0,2200-2259,0.0,0.0,147.0,132.0,117.0,1.0,1009.0,5,2022,SBN,72535014848,South Bend Regional Airport,KSBN,41.70869827270508,-86.31729888916016,799,-86.316,41.707,America/Indiana/Indianapolis,large_airport,19.0,1941,2022-03-04T19:41:00.000+0000,2022-03-05T00:41:00.000+0000,0,2022,3,5,0,72535014848,"SOUTH BEND AIRPORT, IN US",2022-03-04T21:00:00.000+0000,19.0,37.0,48.0,29.40999984741211,10.0,120.0,14.0,0.0,0.6917703135198011
1,3,5,6,2022-03-05,G4,20368,G4,202NV,2861,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,11721,1172105,31721,FNT,"Flint, MI",MI,26,Michigan,43,1630,1633,3.0,3.0,0.0,0,1600-1659,13.0,1646,1911.0,6.0,1922,1917.0,-5.0,0.0,0.0,-1.0,1900-1959,0.0,0.0,172.0,164.0,145.0,1.0,1075.0,5,2022,SRQ,72211512871,Sarasota Bradenton International Airport,KSRQ,27.39539909362793,-82.55439758300781,30,-82.559,27.401,America/New_York,large_airport,16.0,1630,2022-03-05T16:30:00.000+0000,2022-03-05T21:30:00.000+0000,21,2022,3,5,0,72211512871,"SARASOTA BRADENTON AIRPORT, FL US",2022-03-05T18:00:00.000+0000,53.0,79.0,41.0,30.170000076293945,10.0,,,0.0,2.280104758807622
1,3,5,6,2022-03-05,G4,20368,G4,202NV,2861,14986,1498603,34986,SRQ,"Sarasota/Bradenton, FL",FL,12,Florida,33,11721,1172105,31721,FNT,"Flint, MI",MI,26,Michigan,43,1630,1633,3.0,3.0,0.0,0,1600-1659,13.0,1646,1911.0,6.0,1922,1917.0,-5.0,0.0,0.0,-1.0,1900-1959,0.0,0.0,172.0,164.0,145.0,1.0,1075.0,5,2022,SRQ,72211512871,Sarasota Bradenton International Airport,KSRQ,27.39539909362793,-82.55439758300781,30,-82.559,27.401,America/New_York,large_airport,16.0,1630,2022-03-05T16:30:00.000+0000,2022-03-05T21:30:00.000+0000,21,2022,3,5,0,72211512871,"SARASOTA BRADENTON AIRPORT, FL US",2022-03-05T18:00:00.000+0000,53.0,79.0,41.0,30.170000076293945,10.0,,,0.0,2.280104758807622


### Adding Weather Lag Features

In [0]:
def getLagWeatherData(df_main, df_weather, interval_time):
    '''This function adds weather features with specified time lag
    to the main dataframe.
    
    ### Parameters
    1. df_main : DataFrame
        - The dataframe for which new time-lagged weather features will
        be added. The dataframe must have features 'flight_time_utc' 
        and 'station_id' already present.
    2. df_weather : DataFrame
        - The dataframe that includes weather information. The dataframe
        must have features 'metar_hour' and 'weather_station' already present.
    3. inteval_time: List
        - A list of hour values for which the weather data will be lagged.

    ### Returns
    - df_main
        - The main dataframe that has all the time-lagged weather data
        added.
    '''
    
    # parse the metar datetime data
    df_weather = df_weather.withColumn('weather_join_hour', F.hour(F.col("metar_hour")))
    df_weather = df_weather.withColumn('weather_join_date', F.date_format(F.col("metar_hour"),"d"))
    df_weather = df_weather.withColumn('weather_join_year', F.year(F.col("metar_hour")))
    df_weather = df_weather.withColumn('weather_join_month', F.month(F.col("metar_hour")))

    # create a temporary weather data view for SQL operations
    df_weather.createOrReplaceTempView("weather")
    
    # iterate through the list of interval hours
    for interval in interval_time:
        # initiate the interval expression with the current interval
        interval_expr = 'INTERVAL {} HOURS'.format(interval)

        # parse flight utc datetime data and add a 3 hour lag to avoid data leakage with weather data
        df_main = df_main.withColumn('flight_time_join_hour', F.hour(F.col("flight_time_utc")- F.expr(interval_expr)))
        df_main = df_main.withColumn('flight_time_join_date', F.date_format(F.col("flight_time_utc")- F.expr(interval_expr),"d"))
        df_main = df_main.withColumn('flight_time_join_year', F.year(F.col("flight_time_utc")- F.expr(interval_expr)))
        df_main = df_main.withColumn('flight_time_join_month',  F.month(F.col("flight_time_utc")- F.expr(interval_expr)))
        
        # create a temporary main data view for SQL operations
        df_main.createOrReplaceTempView("main")

        # join the main and weather data by flight time and weather station
        df_main = sqlContext.sql(""" 
            SELECT a.*, 
            b.avg_hourly_dew_point_temp AS avg_hourly_dew_point_temp_lag, 
            b.avg_hourly_dry_bulb_temp AS avg_hourly_dry_bulb_temp_lag, 
            b.avg_hourly_relative_humidity AS avg_hourly_relative_humidity_lag, 
            b.avg_hourly_station_pressure AS avg_hourly_station_pressure_lag, 
            b.avg_hourly_visibility AS avg_hourly_visibility_lag, 
            b.avg_hourly_wind_direction AS avg_hourly_wind_direction_lag, 
            b.avg_hourly_wind_speed AS avg_hourly_wind_speed_lag, 
            b.avg_hourly_precipitation_ordinal AS avg_hourly_precipitation_ordinal_lag
            FROM main a 
            JOIN weather b
            ON a.station_id = b.weather_station 
            AND a.flight_time_join_hour = b.weather_join_hour
            AND a.flight_time_join_date = b.weather_join_date
            AND a.flight_time_join_year = b.weather_join_year
            AND a.flight_time_join_month = b.weather_join_month;
            """)
        
        # rename the lagged weather features with appropriate time lag
        df_main = df_main.withColumnRenamed("avg_hourly_dew_point_temp_lag", "avg_hourly_dew_point_temp_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_dry_bulb_temp_lag", "avg_hourly_dry_bulb_temp_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_relative_humidity_lag", "avg_hourly_relative_humidity_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_station_pressure_lag", "avg_hourly_station_pressure_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_visibility_lag", "avg_hourly_visibility_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_wind_direction_lag", "avg_hourly_wind_direction_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_wind_speed_lag", "avg_hourly_wind_speed_lag{}".format(interval)). \
                              withColumnRenamed("avg_hourly_precipitation_ordinal_lag", "avg_hourly_precipitation_ordinal_lag{}".format(interval))
        
        # drop intermediate flight columns used for joining
        JOIN_COLS_FLIGHT = ['flight_time_join_hour', 'flight_time_join_date', 'flight_time_join_year', 'flight_time_join_month']
        df_main = df_main.drop(*JOIN_COLS_FLIGHT)
    
    # drop intermediate weather columns used for joining 
    JOIN_COLS_WEATHER = ['weather_join_hour', 'weather_join_date', 'weather_join_year', 'weather_join_month']
    df_main = df_main.drop(*JOIN_COLS_WEATHER)
    
    return df_main

### Adding Previous Flight Delay Data

In [0]:
def getPastFlightDelay(df_main):
    '''This function adds the arrival delay information of the incoming
    flight for the plane to be used for each departure.
    
    ### Parameters
    1. df_main : DataFrame
        - The dataframe for which the new arrival flight delay feature will
        be added. The dataframe must have features 'flight_time_utc' 
        and 'tail_num' already present.

    ### Returns
    - df_main
        - The main dataframe that has the arrival flight delay information
        added.
    '''
    
    # create a temporary main data view for SQL operations 
    df_main.createOrReplaceTempView('main')
    
    # create new feature for the incoming flight delay information
    df_lag = sqlContext.sql('''
        SELECT flight_time_utc, tail_num,
        LAG(arr_del15, 1) OVER
        (PARTITION BY tail_num ORDER BY flight_time_utc)
        AS arr_del15_lag1
        FROM main
        ''')
    
    # create a temporary incoming flight delay data view for SQL operations 
    df_lag.createOrReplaceTempView('lag')

    # join to add incoming flight delay information to main dataframe
    df_joined = sqlContext.sql('''
        SELECT m.*, l.arr_del15_lag1
        FROM main m JOIN lag l 
        ON m.flight_time_utc = l.flight_time_utc
        AND m.tail_num = l.tail_num
    ''')
    
    return df_joined

### Graph Based Feature - Previous Airport Node

In [0]:
def getPastAirport(df_main):
    '''This function adds the origin of the previous flight 
    for the plane to be used for each departure.
    
    ### Parameters
    1. df_main : DataFrame
        - The dataframe for which the new previous flight origin feature will
        be added. The dataframe must have features 'flight_time_utc' 
        and 'tail_num' already present.

    ### Returns
    - df_main
        - The main dataframe that has the previous flight origin information
        added.
    '''
    # create a temporary main data view for SQL operations 
    df_main.createOrReplaceTempView('main')
    
    df_origin_lag = sqlContext.sql('''
    SELECT flight_time_utc, tail_num,
    LAG(origin, 1) OVER
    (PARTITION BY tail_num ORDER BY flight_time_utc)
    AS origin_lag1
    FROM main
    ''')
    
    df_origin_lag.createOrReplaceTempView('origin_lag')

    df_joined = sqlContext.sql('''
        SELECT m.*, l.origin_lag1
        FROM main m JOIN origin_lag l 
        ON m.flight_time_utc = l.flight_time_utc
        AND m.tail_num = l.tail_num
    ''')
    
    return df_joined

### PageRank

In [0]:
# importing libraries
import pyspark.sql.functions as F
from pyspark.sql.functions import col, isnan, when, sum, max, min, avg, count, mean, when, desc
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
import time

In [0]:
def create_airport_delay_data(flight_df):
    delay_subset_cols = ['flight_time_utc_year', 'flight_time_utc_month', 'origin', 'dest', 'dep_del15']
    delay_data_df = flight_df.select(*delay_subset_cols)
    delay_data_df.withColumn("dep_del15",col("dep_del15").cast(IntegerType()))
    delay_data_agg_df = delay_data_df.groupBy(*['origin', 'dest']) \
                                     .agg(sum('dep_del15').alias('num_delayed_flights'),
                                          count(col('origin')).alias('num_flights'))
    return delay_data_agg_df

In [0]:
def create_airport_delay_graph(delay_data_agg_df):
    origin = delay_data_agg_df.select('origin').distinct().withColumnRenamed("origin", "origin_airport")
    dest = delay_data_agg_df.select('dest').distinct().withColumnRenamed("dest", "dest_airport")
    
    # airport vertices
    airport_vertices_df = origin.union(dest).distinct()
    airport_vertices_df = airport_vertices_df.withColumnRenamed('origin_airport', 'id')
    
    # number of flights and delayed flights
    edges_flight_df = delay_data_agg_df.withColumnRenamed('origin', 'src') \
                                       .withColumnRenamed('dest', 'dst')
    
    # create graph
    airport_delay_graph = GraphFrame(airport_vertices_df, edges_flight_df).cache()
    return airport_delay_graph

In [0]:
def join_pagerank(filtered_df, pagerank_graph):
    join_df = filtered_df.join(pagerank_graph.vertices,
                    on= filtered_df['origin'] == pagerank_graph.vertices['id'],
                    how='left')
    return join_df

In [0]:
def pagerank(flight_df, flight_data_time_dict=None):
    # subset data
    subset_cols =  ['flight_time_utc_year', 'flight_time_utc_month',
                'origin', 'dest', 'dep_del15',
                'tail_num', 'flight_time_utc', 'dest', 'origin']
    flight_df = flight_df.select(*subset_cols)
    
    # create pageranks for airports on a monthly basis
    full_time_start = time.time()
    if flight_data_time_dict is None:
        flight_data_time_dict = {"year": [2015, 2016, 2017, 2018, 2019, 2020, 2021], "month": [i for i in range(1,13)]}
    
    for year in flight_data_time_dict['year']:
        for month in flight_data_time_dict['month']:
            iter_start_time = time.time()
            print(f"Computing page rank for {year}-{month}")
            
            start = time.time()
            print("Filtering data...", end="")
            filtered_df = flight_df.filter(f"flight_time_utc_year = {year}").filter(f"flight_time_utc_month = {month}")
            print(f"Finished in {time.time() - start}")
            
            # pagerank based on delays and # of flights from previous month
            start = time.time()
            print("Grouping Data...", end="")
            if month == 1:
                # Jan -> look at Dec of prev. year
                filtered_pr_df = flight_df.filter(f"flight_time_utc_year = {year - 1}").filter(f"flight_time_utc_month = {12}")
            else:
                filtered_pr_df = flight_df.filter(f"flight_time_utc_year = {year}").filter(f"flight_time_utc_month = {month - 1}")
            if filtered_pr_df.count() == 0:
                if month == 1:
                    print(f"Empty for {year - 1}-{12}, skipping iteration {year}-{month}")
                    print()
                else:
                    print(f"Empty for {year}-{month -1}, skipping iteration {year}-{month}")
                    print()
                continue
            delay_data_agg_df = create_airport_delay_data(filtered_pr_df)
            print(f"Finished in {time.time() - start}")
            
            # create graph of airports with number of delays and flights
            start = time.time()
            print("Creating Graph...", end="")
            airport_delay_graph = create_airport_delay_graph(delay_data_agg_df)
            print(f"Finished in {time.time() - start}")
            
            # run pagerank GraphFrame function
            start = time.time()
            print("Running Pagerank...", end="")
            pagerank = airport_delay_graph.pageRank(resetProbability=0.15, maxIter=5).cache()
            print(f"Finished in {time.time() - start}")
            
            # join the pagerank results to flight data
            start = time.time()
            print('Joining Pagerank Results...', end="")
            result = join_pagerank(filtered_df, pagerank)
            print(f"Finished in {time.time() - start}")

            # record the flight's aiport pagerank at that time  
            result_subset = result.select(*["tail_num", "flight_time_utc", "id", "pagerank"])
            start = time.time()
            print("Writing results...", end="")
            result_subset.write.mode("overwrite").parquet(f"{blob_url}/pagerank-per-flight-monthly/{year}-{month}")
            print(f"Finished in {time.time() - start}")

            # Clean up data
            filtered_df.unpersist()
            delay_data_agg_df.unpersist()
            airport_delay_graph.unpersist()
            result.unpersist()
            result_subset.unpersist()
            print(f"Total time for iteration {year}-{month}: {time.time()-iter_start_time}") 
            print()

Combine pagerank results and join to dataset

In [0]:
def create_pagerank_df():
    union_df = None
    for year in flight_data_time_dict['year']:
        for month in flight_data_time_dict['month']:
            if year == 2015 and month == 1:
                union_df = spark.read.parquet(f"{blob_url}/pagerank-per-flight-monthly/{year}-{month}")
                print(f'processed: {union_df.count()}')
            else: 
                curr_df = spark.read.parquet(f"{blob_url}/pagerank-per-flight-monthly/{year}-{month}")
                union_df = union_df.union(curr_df)
                print(f'processed: {union_df.count()}')
    print(f'final length: {union_df.count()}')
    union_df.write.mode('overwrite').parquet(f"{blob_url}/flights_with_airport_pagerank")

### Holiday Feature

In [0]:
def generate_holiday_df():
    us_holidays_data_for_df = {}

    holiday_date = []
    holiday_name = []

    for day_tuple in us_holidays:
        holiday_date.append(day_tuple[0])
        holiday_name.append(day_tuple[1])

    us_holidays_data_for_df['holiday_name'] = holiday_name
    us_holidays_data_for_df['holiday_date'] = holiday_date
    
    us_holidays_df = pd.DataFrame.from_dict(us_holidays_data_for_df)
    us_holidays_df = us_holidays_df.sort_values('holiday_date')
    us_holidays_df['start_holiday_range_date_3'] = us_holidays_df['holiday_date'] + DateOffset(days=-3)
    us_holidays_df['start_holiday_range_date_2'] = us_holidays_df['holiday_date'] + DateOffset(days=-2)
    us_holidays_df['start_holiday_range_date_1'] = us_holidays_df['holiday_date'] + DateOffset(days=-1)
    us_holidays_df['start_holiday_range_date_3'] = us_holidays_df['start_holiday_range_date_3'].dt.date
    us_holidays_df['start_holiday_range_date_2'] = us_holidays_df['start_holiday_range_date_2'].dt.date
    us_holidays_df['start_holiday_range_date_1'] = us_holidays_df['start_holiday_range_date_1'].dt.date
    us_holidays_df['end_holiday_range_date_1'] = us_holidays_df['holiday_date'] + DateOffset(days=+1)
    us_holidays_df['end_holiday_range_date_2'] = us_holidays_df['holiday_date'] + DateOffset(days=+2)
    us_holidays_df['end_holiday_range_date_3'] = us_holidays_df['holiday_date'] + DateOffset(days=+3)
    us_holidays_df['end_holiday_range_date_1'] = us_holidays_df['end_holiday_range_date_1'].dt.date
    us_holidays_df['end_holiday_range_date_2'] = us_holidays_df['end_holiday_range_date_2'].dt.date
    us_holidays_df['end_holiday_range_date_3'] = us_holidays_df['end_holiday_range_date_3'].dt.date
    
    us_holidays_df = us_holidays_df.loc[~(us_holidays_df['holiday_name'].str.contains("Observed"))
    
    # turn from wide to long format, will join the flight data and create a holiday boolean flag
    us_holidays_df = pd.melt(us_holidays_df, id_vars=['holiday_name'], value_vars=['holiday_date', 'start_holiday_range_date_3', 'start_holiday_range_date_2', 'start_holiday_range_date_1', 'end_holiday_range_date_1', 'end_holiday_range_date_2', 'end_holiday_range_date_3'])
    us_holidays_df = us_holidays_df.sort_values('value')  
    
    
    us_holidays_spark_df.write.mode('overwrite').csv(f"{blob_url}/holiday_dates_range")
    
    return us_holidays_df

### Adding Flight Route Delay Information

In [0]:
def getFrequentDelayRoutes(df_main):
    
    df_main = df_main.withColumn('flight_time_join_month', F.month(F.col("flight_time_utc")- F.expr("INTERVAL 1 MONTH")))
    df_main = df_main.withColumn('flight_time_join_year', F.year(F.col("flight_time_utc")- F.expr("INTERVAL 1 MONTH")))
    df_main.createOrReplaceTempView('main')

    df_flight_delay_avg = sqlContext.sql('''
        SELECT flight_time_utc_year, flight_time_utc_month, origin, dest, AVG(dep_del15) AS avg_dep_delay
        FROM main
        GROUP BY flight_time_utc_year, flight_time_utc_month, origin, dest
        ''')
    
    df_flight_delay_avg.createOrReplaceTempView('delay')

    df_frequent_delay = sqlContext.sql('''
        SELECT flight_time_utc_year, flight_time_utc_month, origin, dest, avg_dep_delay,
        CASE
            WHEN avg_dep_delay > 0.3 THEN 1
            ELSE 0
        END AS frequent_delay
        FROM delay
    ''')
    
    df_frequent_delay.createOrReplaceTempView('delay')

    df_joined = sqlContext.sql('''
        SELECT m.*, d.frequent_delay
        FROM main m
        LEFT JOIN delay d
        ON m.flight_time_join_year = d.flight_time_utc_year
        AND m.flight_time_join_month = d.flight_time_utc_month
        AND m.origin = d.origin
        AND m.dest = d.dest;
        ''')
    
    df_joined = df_joined.fillna(2, subset='frequent_delay')
    
    COLS_FOR_JOIN = ['flight_time_join_month', 'flight_time_join_year']
    df_joined = df_joined.drop(*COLS_FOR_JOIN)
    
    return df_joined

### Adding Total Flights in the Past Month Information

In [0]:
def getGetPastNumFlights(df_main):
    
    df_main = df_main.withColumn('flight_time_join_month', F.month(F.col("flight_time_utc")- F.expr("INTERVAL 1 MONTH")))
    df_main = df_main.withColumn('flight_time_join_year', F.year(F.col("flight_time_utc")- F.expr("INTERVAL 1 MONTH")))
    df_main.createOrReplaceTempView('main')


    df_total_flights = sqlContext.sql('''
        SELECT flight_time_utc_year, flight_time_utc_month, COUNT(*) AS total_flights
        FROM main
        GROUP BY flight_time_utc_year, flight_time_utc_month
        ''')
    df_total_flights.createOrReplaceTempView('total')
    
    df_joined = sqlContext.sql('''
        SELECT m.*, t.total_flights
        FROM main m
        LEFT JOIN total t
        ON m.flight_time_join_year = t.flight_time_utc_year
        AND m.flight_time_join_month = t.flight_time_utc_month
        ''')
    
    JOIN_COLS = ['flight_time_join_year', 'flight_time_join_month']
    df_joined = df_joined.drop(*JOIN_COLS)
    
    fill_in = df_joined.filter((F.col('flight_time_utc_year') == 2022) & (F.col('flight_time_utc_month') == 2)).first()['total_flights']
    df_joined = df_joined.fillna(fill_in, subset='total_flights')
    
    return df_joined

In [0]:
df_baseline = spark.read.parquet(f"{blob_url}/final_dataset_2022")
df_weather = spark.read.parquet(f"{blob_url}/weather_cleaned_1_hour_2022_agg")

df_final = getLagWeatherData(df_baseline, df_weather, [6, 12])
df_final = getPastFlightDelay(df_final)
df_final = getPastAirport(df_final)
df_final = getFrequentDelayRoutes(df_final)
df_final = getGetPastNumFlights(df_final)

df_final.display()

quarter,month,day_of_month,day_of_week,fl_date,op_unique_carrier,op_carrier_airline_id,op_carrier,tail_num,op_carrier_fl_num,origin_airport_id,origin_airport_seq_id,origin_city_market_id,origin,origin_city_name,origin_state_abr,origin_state_fips,origin_state_nm,origin_wac,dest_airport_id,dest_airport_seq_id,dest_city_market_id,dest,dest_city_name,dest_state_abr,dest_state_fips,dest_state_nm,dest_wac,crs_dep_time,dep_time,dep_delay,dep_delay_new,dep_del15,dep_delay_group,dep_time_blk,taxi_out,wheels_off,wheels_on,taxi_in,crs_arr_time,arr_time,arr_delay,arr_delay_new,arr_del15,arr_delay_group,arr_time_blk,cancelled,diverted,crs_elapsed_time,actual_elapsed_time,air_time,flights,distance,distance_group,year,IATA,station_id,name,icao,lat,lon,altitude,station_lon,station_lat,timezone,type,crs_dep_hour,flight_hour,flight_time,flight_time_utc,flight_time_utc_hour,flight_time_utc_year,flight_time_utc_month,flight_time_utc_date,is_holiday,weather_station,weather_station_name,metar_hour,avg_hourly_dew_point_temp,avg_hourly_dry_bulb_temp,avg_hourly_relative_humidity,avg_hourly_station_pressure,avg_hourly_visibility,avg_hourly_wind_direction,avg_hourly_wind_speed,avg_hourly_precipitation_ordinal,pagerank,avg_hourly_dew_point_temp_lag6,avg_hourly_dry_bulb_temp_lag6,avg_hourly_relative_humidity_lag6,avg_hourly_station_pressure_lag6,avg_hourly_visibility_lag6,avg_hourly_wind_direction_lag6,avg_hourly_wind_speed_lag6,avg_hourly_precipitation_ordinal_lag6,avg_hourly_dew_point_temp_lag12,avg_hourly_dry_bulb_temp_lag12,avg_hourly_relative_humidity_lag12,avg_hourly_station_pressure_lag12,avg_hourly_visibility_lag12,avg_hourly_wind_direction_lag12,avg_hourly_wind_speed_lag12,avg_hourly_precipitation_ordinal_lag12,arr_del15_lag1,origin_lag1,frequent_delay,total_flights
1,1,3,1,2022-01-03,G4,20368,G4,219NV,314,10135,1013506,30135,ABE,"Allentown/Bethlehem/Easton, PA",PA,42,Pennsylvania,23,14082,1408202,34082,PGD,"Punta Gorda, FL",FL,12,Florida,33,800,747,-13.0,0.0,0.0,-1,0800-0859,13.0,800,1032.0,7.0,1044,1039.0,-5.0,0.0,0.0,-1.0,1000-1059,0.0,0.0,164.0,172.0,152.0,1.0,1018.0,5,2022,ABE,72517014737,Lehigh Valley International Airport,KABE,40.652099609375,-75.44080352783203,393,-75.448,40.65,America/New_York,medium_airport,8.0,800,2022-01-03T08:00:00.000+0000,2022-01-03T13:00:00.000+0000,13,2022,1,3,1,72517014737,"ALLENTOWN LEHIGH VALLEY INTERNATIONAL AIRPORT, PA US",2022-01-03T10:00:00.000+0000,16.0,28.0,60.0,29.700000762939453,10.0,10.0,11.0,0.0,,18.5,29.0,64.5,29.68000030517578,10.0,40.0,13.0,0.0,21.5,30.5,69.0,29.68000030517578,10.0,320.0,10.0,0.0,0.0,PGD,2,489468
1,1,2,7,2022-01-02,G4,20368,G4,219NV,314,10135,1013506,30135,ABE,"Allentown/Bethlehem/Easton, PA",PA,42,Pennsylvania,23,14082,1408202,34082,PGD,"Punta Gorda, FL",FL,12,Florida,33,1336,1331,-5.0,0.0,0.0,-1,1300-1359,11.0,1342,1621.0,5.0,1620,1626.0,6.0,6.0,0.0,0.0,1600-1659,0.0,0.0,164.0,175.0,159.0,1.0,1018.0,5,2022,ABE,72517014737,Lehigh Valley International Airport,KABE,40.652099609375,-75.44080352783203,393,-75.448,40.65,America/New_York,medium_airport,13.0,1336,2022-01-02T13:36:00.000+0000,2022-01-02T18:36:00.000+0000,18,2022,1,2,1,72517014737,"ALLENTOWN LEHIGH VALLEY INTERNATIONAL AIRPORT, PA US",2022-01-02T15:00:00.000+0000,42.0,48.0,80.0,29.31999969482422,10.0,320.0,16.0,0.0,,51.0,56.0,84.0,29.21999931335449,10.0,270.0,7.0,0.0,50.0,51.0,96.0,29.15999984741211,4.0,230.0,3.0,0.0,0.0,PIE,2,489468
1,1,2,7,2022-01-02,NK,20416,NK,N503NK,3104,10158,1015804,30158,ACY,"Atlantic City, NJ",NJ,34,New Jersey,21,13303,1330303,32467,MIA,"Miami, FL",FL,12,Florida,33,1030,1024,-6.0,0.0,0.0,-1,1000-1059,10.0,1034,1302.0,8.0,1312,1310.0,-2.0,0.0,0.0,-1.0,1300-1359,0.0,0.0,162.0,166.0,148.0,1.0,998.0,4,2022,ACY,72407093730,Atlantic City International Airport,KACY,39.45759963989258,-74.57720184326172,75,-74.567,39.452,America/New_York,medium_airport,10.0,1030,2022-01-02T10:30:00.000+0000,2022-01-02T15:30:00.000+0000,15,2022,1,2,1,72407093730,"ATLANTIC CITY INTERNATIONAL AIRPORT, NJ US",2022-01-02T12:00:00.000+0000,58.0,61.0,90.0,29.59000015258789,10.0,250.0,13.0,0.0,,57.0,57.5,98.5,29.600000381469727,10.0,240.0,9.0,0.0,60.0,61.0,97.0,29.51000022888184,10.0,260.0,11.0,1.0,0.0,FLL,2,489468
1,1,2,7,2022-01-02,DL,19790,DL,N996AT,2120,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1013,1036,23.0,23.0,1.0,1,1000-1059,11.0,1047,1203.0,3.0,1158,1206.0,8.0,8.0,0.0,0.0,1100-1159,0.0,0.0,105.0,90.0,76.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,10.0,1013,2022-01-02T10:13:00.000+0000,2022-01-02T15:13:00.000+0000,15,2022,1,2,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-02T12:00:00.000+0000,57.0,60.0,90.0,28.809999465942383,10.0,186.66666666666663,7.0,0.6666666666666666,,60.0,62.0,93.0,28.799999237060547,10.0,180.0,9.0,1.0,64.0,73.0,74.0,28.65999984741211,10.0,210.0,14.0,0.0,0.0,GSO,2,489468
1,1,2,7,2022-01-02,WN,19393,WN,N8650F,1829,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1640,1859,139.0,139.0,1.0,9,1600-1659,18.0,1917,2023.0,5.0,1820,2028.0,128.0,128.0,1.0,8.0,1800-1859,0.0,0.0,100.0,89.0,66.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,16.0,1640,2022-01-02T16:40:00.000+0000,2022-01-02T21:40:00.000+0000,21,2022,1,2,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-02T18:00:00.000+0000,54.0,57.0,90.0,28.71999931335449,10.0,0.0,0.0,0.0,,54.0,57.5,88.5,28.75,8.5,30.0,2.5,1.0,60.0,62.0,93.0,28.799999237060547,10.0,180.0,9.0,1.0,1.0,SAN,2,489468
1,1,1,6,2022-01-01,WN,19393,WN,N8518R,1742,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1055,1053,-2.0,0.0,0.0,-1,1000-1059,15.0,1108,1226.0,4.0,1235,1230.0,-5.0,0.0,0.0,-1.0,1200-1259,0.0,0.0,100.0,97.0,78.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,10.0,1055,2022-01-01T10:55:00.000+0000,2022-01-01T15:55:00.000+0000,15,2022,1,1,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-01T12:00:00.000+0000,67.5,72.0,85.5,28.809999465942383,8.5,220.0,14.0,1.0,,65.0,71.5,80.0,28.8700008392334,10.0,210.0,12.0,0.0,66.0,71.0,84.0,28.84000015258789,10.0,220.0,10.0,0.0,0.0,MCO,2,489468
1,1,3,1,2022-01-03,DL,19790,DL,N814DN,2228,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1700,1759,59.0,59.0,1.0,3,1700-1759,16.0,1815,1934.0,15.0,1836,1949.0,73.0,73.0,1.0,4.0,1800-1859,0.0,0.0,96.0,110.0,79.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,17.0,1700,2022-01-03T17:00:00.000+0000,2022-01-03T22:00:00.000+0000,22,2022,1,3,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-03T19:00:00.000+0000,26.0,35.0,70.0,29.18000030517578,10.0,330.0,10.0,0.0,,27.0,40.0,60.0,29.15999984741211,10.0,320.0,15.0,0.0,29.0,36.0,76.0,29.1200008392334,10.0,330.0,13.0,0.0,1.0,RDU,2,489468
1,1,2,7,2022-01-02,DL,19790,DL,N832DN,2228,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1700,1730,30.0,30.0,1.0,2,1700-1759,10.0,1740,1847.0,5.0,1836,1852.0,16.0,16.0,1.0,1.0,1800-1859,0.0,0.0,96.0,82.0,67.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,17.0,1700,2022-01-02T17:00:00.000+0000,2022-01-02T22:00:00.000+0000,22,2022,1,2,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-02T19:00:00.000+0000,54.0,57.0,90.0,28.71999931335449,10.0,130.0,6.0,0.0,,54.0,59.0,83.0,28.739999771118164,10.0,330.0,6.0,0.0,59.0,62.0,90.0,28.81999969482422,10.0,200.0,6.0,0.0,0.0,STL,2,489468
1,1,1,6,2022-01-01,DL,19790,DL,N928AT,2120,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1012,1011,-1.0,0.0,0.0,-1,1000-1059,18.0,1029,1148.0,3.0,1156,1151.0,-5.0,0.0,0.0,-1.0,1100-1159,0.0,0.0,104.0,100.0,79.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,10.0,1012,2022-01-01T10:12:00.000+0000,2022-01-01T15:12:00.000+0000,15,2022,1,1,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-01T12:00:00.000+0000,67.5,72.0,85.5,28.809999465942383,8.5,220.0,14.0,1.0,,65.0,71.5,80.0,28.8700008392334,10.0,210.0,12.0,0.0,66.0,71.0,84.0,28.84000015258789,10.0,220.0,10.0,0.0,0.0,BWI,2,489468
1,1,2,7,2022-01-02,YX,20452,YX,N739YX,3415,10397,1039707,30397,ATL,"Atlanta, GA",GA,13,Georgia,34,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1922,1928,6.0,6.0,0.0,0,1900-1959,13.0,1941,2048.0,15.0,2109,2103.0,-6.0,0.0,0.0,-1.0,2100-2159,0.0,0.0,107.0,95.0,67.0,1.0,534.0,3,2022,ATL,72219013874,Hartsfield Jackson Atlanta International Airport,KATL,33.6367,-84.428101,1026,-84.442,33.63,America/New_York,large_airport,19.0,1922,2022-01-02T19:22:00.000+0000,2022-01-03T00:22:00.000+0000,0,2022,1,3,1,72219013874,"ATLANTA HARTSFIELD JACKSON INTERNATIONAL AIRPORT, GA US",2022-01-02T21:00:00.000+0000,55.0,57.0,93.0,28.63999938964844,1.0,130.0,7.5,0.0,,54.0,57.0,90.0,28.71999931335449,10.0,0.0,0.0,0.0,57.0,60.0,90.0,28.809999465942383,10.0,186.66666666666663,7.0,0.6666666666666666,0.0,EWR,2,489468


### Feature Reselection

In [0]:
# features used in baseline
CATEGORICAL_COLS = ['quarter', 'month', 'day_of_month', 'day_of_week', 'op_unique_carrier', 'op_carrier_airline_id', 'tail_num', 'op_carrier_fl_num', 'origin_airport_id', 'origin_city_market_id', 'origin', 'origin_city_name', 'origin_state_abr', 'dest_airport_id', \
                    'dest_city_market_id', 'dest', 'dest_city_name', 'dest_state_abr', 'crs_elapsed_time', 'year', 'IATA', 'station_id', 'icao', 'crs_dep_hour', 'flight_time_utc_hour', 'flight_time_utc_year', 'flight_time_utc_month', \
                    'flight_time_utc_date', 'weather_station', 'avg_hourly_wind_direction']

NUMERICAL_COLS = ['avg_hourly_dew_point_temp', 'avg_hourly_dry_bulb_temp', 'avg_hourly_relative_humidity', 'avg_hourly_station_pressure', 'avg_hourly_visibility', 'avg_hourly_wind_speed', 'avg_hourly_precipitation_ordinal']

TARGET = ['dep_del15']

# engineered features
FEATURE_ENG_COLS = ['avg_hourly_dew_point_temp_lag6', 'avg_hourly_dry_bulb_temp_lag6', 'avg_hourly_relative_humidity_lag6', 'avg_hourly_station_pressure_lag6', 'avg_hourly_visibility_lag6', 'avg_hourly_wind_direction_lag6', 'avg_hourly_wind_speed_lag6', 'avg_hourly_precipitation_ordinal_lag6',
                    'avg_hourly_dew_point_temp_lag12', 'avg_hourly_dry_bulb_temp_lag12', 'avg_hourly_relative_humidity_lag12', 'avg_hourly_station_pressure_lag12', 'avg_hourly_visibility_lag12', 'avg_hourly_wind_direction_lag12', 'avg_hourly_wind_speed_lag12', 'avg_hourly_precipitation_ordinal_lag12',
                    'arr_del15_lag1', 'origin_lag1', 'is_holiday', 'pagerank', 'total_flights', 'type']

USE_COLS = CATEGORICAL_COLS + NUMERICAL_COLS + FEATURE_ENG_COLS + TARGET

# redundant or low-explanatory features to be dropped from feature reselection
DROP_COLS = ['month', 'day_of_month', 'op_carrier_airline_id', 'op_carrier_fl_num', 'origin_airport_id', 'origin_city_market_id', 'origin_city_name', 'dest_airport_id', 'dest_city_market_id', 'dest_city_name', 'year', 'IATA', 'station_id', 'icao', 'weather_station', 'flight_time_utc_date']


In [0]:
df_final.write.mode('overwrite').parquet(f"{blob_url}/final_feature_eng_2022")