In [0]:
from pyspark.sql import types, Window, functions as F
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.ml.feature import StringIndexer


# Airline Cleaning and Imputation
1. `impute_null_numeric`: Takes in a dataset and a list of numeric features, then performs either mean or median imputation for the NULL values. The mean/median values are based on values from the same airport and month to impute more accurate values. 
2. `airline_field_cleaner`: Cleans up the weather fields for the master dataset. This drops errorneous and outlier values, performs 0 and median imputation for missing values, and finally converts all categorical features to numeric by either indexing or onehotencoding.

## 1. impute_null_numeric

In [0]:
def impute_null_numeric(df, vars_to_impute, origin_or_dest='origin', method='median'):
    '''
    Inputs:
    df: full flights dataset
    vars_to_impute: a list of variabe names to impute null values for. Only works for numeric variables for now
    origin_or_dest: choose either origin or dest airport. Then this will be the airport used in the groupby
    method: can be either median or mean
    
    Output:
    pyspark dataframe with nulls replaced
    '''
    
    if origin_or_dest == 'origin':
        columns_to_groupby = ['year_airlns', 'month_airlns', 'origin_airlns']
        
    else:
        columns_to_groupby = ['year_airlns', 'month_airlns', 'dest_airlns']
    
    for var in vars_to_impute:
        
        if method == 'mean':
            grp_df = df.groupby(columns_to_groupby).agg(F.mean(var).alias('impute_val')).show(10)
        
        else:
            grp_df = df.groupby(columns_to_groupby).agg(F.expr(f'percentile_approx({var}, .5)').alias('impute_val'))
        
        
        df = df.join(grp_df, on=columns_to_groupby, how='left')
        df = df.withColumn(var, F.when(F.col(var).isNull(), F.col('impute_val')).otherwise(F.col(var))).drop('impute_val')
        
    return df

## 2. airline_field_cleaner

In [0]:
def airline_field_cleaner(df, clean_null=True, remove_outliers=True, cat_var_encode_method='index'):
    '''
    Cleans every single airport column. Deals w/ NULLs and outliers.
    Must pass in the final joined df because I use that column naming.
    See google doc for reasoning behind these: https://docs.google.com/spreadsheets/d/1hj4W8B_U49jZOLH41LP1dL8OqRlD_-7m/edit#gid=764608588
    '''
    
    if clean_null:
        
        # For some vars, impute values for nulls
        # but these aren't used in the final dataset
#         df = df.na.fill({
#             'CANCELLATION_CODE_AIRLNS': 'missing',  # var is taken out of the final cleaned df
#             'ARR_DELAY_NEW_AIRLNS': 0, 
#             'ARR_DEL15_AIRLNS': 0, 
#             'ARR_DELAY_AIRLNS': 0
#             ,'ARR_DELAY_GROUP_AIRLNS': 0  # var is taken out of the final cleaned df
#         })
        
        # drop some obs
        # I previously also filtered out nulls for arr_time_airlns, but this was removed
        df = df.filter(~F.col('DEP_DEL15_AIRLNS').isNull())
        
        # drop some columns
        df = df.drop(*['CARRIER_DELAY_AIRLNS', 'WEATHER_DELAY_AIRLNS', 'NAS_DELAY_AIRLNS', 'SECURITY_DELAY_AIRLNS', 'LATE_AIRCRAFT_DELAY_AIRLNS']) 
        
        # median imputation based on origin/dest flights
        # I commented this out later b/c I guess we removed this var b/c we wouldn't know this ahead of time anyways. 
#         grp_df = df.groupby('origin_airlns', 'dest_airlns').agg(F.expr('percentile_approx(ACTUAL_ELAPSED_TIME_AIRLNS, .5)').alias('impute_val'))
#         df = df.join(grp_df, on=['origin_airlns', 'dest_airlns'], how='left')
#         df = df.withColumn('ACTUAL_ELAPSED_TIME_AIRLNS', 
#                            F.when(F.col('ACTUAL_ELAPSED_TIME_AIRLNS').isNull(), 
#                                   F.col('impute_val'))\
#                            .otherwise(F.col('ACTUAL_ELAPSED_TIME_AIRLNS')))\
#                 .drop('impute_val')
        
        # final na's to drop
        df = df.filter(~F.col('CRS_ELAPSED_TIME_AIRLNS').isNull())
        
    
    if remove_outliers:
        
        # removing extreme outliers
        # removed: .filter(F.col('ACTUAL_ELAPSED_TIME_AIRLNS') != 1604)\
        df = df\
            .filter(F.col('CRS_ELAPSED_TIME_AIRLNS') != 813)\
            .filter(F.col('CRS_ELAPSED_TIME_AIRLNS') > 0)\
            .filter(F.col('ARR_DELAY_AIRLNS') <= 2300)
        
        
    if cat_var_encode_method=='index':
        
        # Specify which columns to index (ie cast to int)
        vars_to_index = [
            'ORIGIN_AIRLNS', 
            'DEST_AIRLNS', 
            'OP_UNIQUE_CARRIER_AIRLNS' # a more granular form of origin/dest airlines
        ]

        # rename cols to drop them later
        for var in vars_to_index:
            df = df.withColumnRenamed(var, var+'_old')

        # finally, index them
        indexer = StringIndexer(inputCols=[i+'_old' for i in vars_to_index], outputCols=vars_to_index)
        df = indexer.fit(df).transform(df)
        df = df.drop(*[i+'_old' for i in vars_to_index])
        
        # drop cat vars that we won't be using
        vars_to_drop = [
            'ARR_TIME_BLK_AIRLNS', # already have the arrival time, so drop the blocks
            'ORIGIN_CITY_NAME_AIRLNS', # too granular to model with
            'DEST_CITY_NAME_AIRLNS',
            'ORIGIN_STATE_ABR_AIRLNS', # already have state_fips which is numeric
            'ORIGIN_STATE_NM_AIRLNS',
            'DEST_STATE_NM_AIRLNS',
            'DEST_STATE_ABR_AIRLNS', 
            'CANCELLATION_CODE_AIRLNS' # this is a response var, we won't know why a flight was cancelled in test data
        ]
        df = df.drop(*vars_to_drop)
        
        
    if cat_var_encode_method=='onehotencode':
        pass
        # not built out yet b/c vini advised against this and our data has so many vars already.
        # however, I think we'll need this if we use a logistic regression model. 
    
    return df

# Weather Setup and Preprocessing
Establishes dictionaries and lists to be used for weather cleaning and imputing.

## Generating dictionaries
This section generates dictionaries that are needed for use in functions. More context including data dictionary exploration linked [here](https://adb-731998097721284.4.azuredatabricks.net/?o=731998097721284#notebook/4070574709969671/command/4070574709969698).

In [0]:
# data dict var names exact from documentation
# I used the exact documentation name in order to make it easy to locate in my
# spreadsheet here: https://docs.google.com/spreadsheets/d/1xthRtOjC5-kV0LMIRa_grkxZ6sLBlpqDpNwygOH_jlA/edit?usp=sharing
var_names_dict_base = {'WND': ['direction_angle', 'direction_quality_code', 'type_code', 'speed_rate', 'speed_quality_code'],
                  'CIG': ['ceiling_height_dimension', 'ceiling_quality_code', 'ceiling_determination_code', 'CAVOK_code'],
                  'VIS' : ['distance_dimension', 'distance_quality_code', 'variability_code', 'quality_variability_code'],
                  'TMP' : ['air_temperature', 'air_temperature_quality_code'],
                  'DEW' : ['dew_point_temperature', 'dew_point_quality_code'],
                  'SLP' : ['sea_level_pressure', 'sea_level_pressure_quality_code'],
                  'GA1' : ['coverage_code', 'coverage_quality_code', 'base_height_dimension', 'base_height_quality_code', 'cloud_type_code', 'cloud_type_quality_code'],
                  'GF1' : ['total_coverage_code', 'total_opaque_coverage_code', 'quality_total_coverage_code', 'total_lowest_cloud_cover_code', 'quality_total_lowest_cloud_cover_code',
                          'low_cloud_genus_code', 'quality_low_cloud_genus_code', 'lowest_cloud_base_height_dimension', 'lowest_cloud_base_height_quality_code', 'mid_cloud_genus_code',
                          'quality_mid_cloud_genus_code', 'high_cloud_genus_code', 'quality_high_cloud_genus_code'],
                  'MA1' : ['altimeter_setting_rate', 'altimeter_quality_code', 'station_pressure_rate', 'station_pressure_quality_code'],
                  'REM' : ['remark_identifier', 'remark_length_quantity', 'remark_text'],
                  'AA1' : ['period_quantity_in_hours', 'depth_dimension', 'condition_code', 'quality_code'],
                  'AA2' : ['period_quantity_in_hours', 'depth_dimension', 'condition_code', 'quality_code'],
                  'AJ1' : ['dimension', 'condition_code', 'quality_code', 'equivalent_water_depth_dimension', 'equivalent_water_condition_code', 'equivalent_water_condition_quality_code'],
                  'AL1' : ['period_quantity', 'depth_dimension', 'condition_code', 'quality_code'],
                  'AN1' : ['period_quantity', 'depth_dimension', 'condition_code', 'quality_code'],
                  'AO1' : ['period_quantity_in_minutes', 'depth_dimension', 'condition_code', 'quality_code'],
                  'AU1' : ['intensity_and_proximity_code', 'descriptor_code', 'precipitation_code', 'obscuration_code', 'other_weather_phenomena_code', 'combination_indicator_code', 'quality_code'],
                  'AT1' : ['source_element', 'weather_type', 'weather_type_abbreviation', 'quality_code']
                 }

# dictionary with number of subfeatures for each weather variable
num_subfeatures_dict_base = {'WND' : 5,
                        'CIG' : 4,
                        'VIS' : 4,
                        'TMP' : 2,
                        'DEW' : 2,
                        'SLP' : 2,
                        'GA1' : 6,
                        'GF1' : 13,
                        'MA1' : 4,
                        'REM' : 3,
                        'AA1' : 4,
                        'AA2' : 4,
                        'AJ1' : 6,
                        'AL1' : 4,
                        'AN1' : 4,
                        'AO1' : 4,
                        'AU1' : 7,
                        'AT1' : 4
                       }

# NAs are quality codes which don't indicate missing value
# UNK means that it's not a quality code but no indicator available for missing data usually b/c categorical
missing_value_code_dict_base = {'WND' : ["999", "9","9","9999", "NA"],
                          'CIG' : ["99999", "NA", "9", "9"],
                          'VIS' : ["999999", "NA", "9", "NA"],
                          'TMP' : ["+9999", "NA"],
                          'DEW' : ["+9999", "NA"],
                          'SLP' : ["99999", "NA"],
                          'GA1' : ["99", "NA", "+99999", "NA", "99", "NA"],
                          'GF1' : ["99", "99", "NA", "99", "NA", "99", "NA", "99999", "NA", "99", "NA", "99", "NA"],
                          'MA1' : ["99999", "NA", "99999", "NA"],
                          'REM' : ["NA", "NA", "NA"],
                          'AA1' : ["99", "9999", "9", "NA"],
                          'AA2' : ["99", "9999", "9", "NA"],
                          'AJ1' : ["9999", "9", "NA", "999999", "9", "NA"],
                          'AL1' : ["99", "999", "9", "NA"],
                          'AN1' : ["999", "9999", "9", "NA"],
                          'AO1' : ["99", "9999", "9", "NA"],
                          'AU1' : ["9", "9", "99", "9", "9", "9", "NA"],
                          'AT1' : ["UNK", "UNK", "UNK", "NA"]
                          }

# dictionary with type casting for each var
type_code_dict_base = {'WND' : ['float', 'int', 'str', 'float', 'int'],
                  'CIG' : ['float', 'int', 'str', 'str'],
                  'VIS' : ['float', 'int', 'int', 'int'],
                  'TMP' : ['float', 'int'],
                  'DEW' : ['float', 'int'],
                  'SLP' : ['float', 'int'],
                  'GA1' : ['str', 'int', 'float', 'int', 'str', 'int'],
                  'GF1' : ['str', 'str', 'int', 'str', 'int', 'str', 'int', 'float', 'int', 'str', 'int', 'str', 'int'],
                  'MA1' : ['float', 'int', 'float', 'int'],
                  'AA1' : ['float', 'float', 'str', 'int'],
                  'AA2' : ['float', 'float', 'str', 'int'],
                  'AJ1' : ['float', 'str', 'int', 'float', 'int', 'int'],
                  'AL1' : ['float', 'float', 'str', 'int'],
                  'AN1' : ['float', 'float', 'str', 'int'],
                  'AO1' : ['float', 'float', 'str', 'int'],
                  'AU1' : ['str', 'str', 'str', 'str', 'str', 'str', 'int'],
                  'AT1' : ['int', 'str', 'str', 'int']
                 }

# scaling factor for each var
# 1.00 if int or not stated in documentation
scale_factor_dict_base = {'WND' : [1.00, 1.00, 1.00, 10.00, 1.00],
                     'CIG' : [1.00, 1.00, 1.00, 1.00],
                     'VIS' : [1.00, 1.00, 1.00, 1.00],
                     'TMP' : [10.00, 1.00],
                     'DEW' : [10.00, 1.00],
                     'SLP' : [10.00, 1.00],
                     'GA1' : [1.00, 1.00, 1.00, 1.00, 1.00, 1.00],
                     'GF1' : [1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00],
                     'MA1' : [10.00, 1.00, 10.00, 1.00],
                     'AA1' : [1.00, 10.00, 1.00, 1.00],
                     'AA2' : [1.00, 10.00, 1.00, 1.00],
                     'AJ1' : [1.00, 1.00, 1.00, 10.00, 1.00, 1.00],
                     'AL1' : [1.00, 1.00, 1.00, 1.00],
                     'AN1' : [1.00, 10.00, 1.00, 1.00],
                     'AO1' : [1.00, 10.00, 1.00, 1.00],
                     'AU1' : [1.00, 1.00, 1.00, 1.00, 1.00, 1.00, 1.00],
                     'AT1' : [1.00, 1.00, 1.00, 1.00]
                    }

# list the columns to drop after splitting
# Drops all quality codes as well as the following due to 100% null values:
# VIS_WTHR_variability_code
# AJ1_WTHR_equivalent_water_condition_code 
# GF1_WTHR_total_opaque_coverage_code
# GF1_WTHR_low_cloud_genus_code
# GF1_WTHR_mid_cloud_genus_code
# GF1_WTHR_high_cloud_genus_code
# AO1_WTHR_condition_code

quality_code_dict_base = {'WND': ['direction_quality_code', 'speed_quality_code'],
                  'CIG': ['ceiling_quality_code'],
                  'VIS' : ['variability_code', 'distance_quality_code', 'quality_variability_code'],
                  'TMP' : ['air_temperature_quality_code'],
                  'DEW' : ['dew_point_quality_code'],
                  'SLP' : ['sea_level_pressure_quality_code'],
                  'GA1' : ['coverage_quality_code', 'base_height_quality_code', 'cloud_type_quality_code'],
                  'GF1' : ['quality_total_coverage_code', 
                           'total_opaque_coverage_code', 
                           'quality_total_lowest_cloud_cover_code', 
                           'low_cloud_genus_code', 'quality_low_cloud_genus_code', 
                           'lowest_cloud_base_height_quality_code', 
                           'mid_cloud_genus_code', 'quality_mid_cloud_genus_code', 
                           'high_cloud_genus_code',  'quality_high_cloud_genus_code'], 
                  'MA1' : ['altimeter_quality_code', 'station_pressure_quality_code'],
                  'REM' : ['remark_identifier', 'remark_length_quantity', 'remark_text'],
                  'AA1' : ['quality_code'],
                  'AA2' : ['quality_code'],
                  'AJ1' : ['quality_code', 'equivalent_water_condition_code', 'equivalent_water_condition_quality_code'],
                  'AL1' : ['quality_code'],
                  'AN1' : ['quality_code'],
                  'AO1' : ['condition_code', 'quality_code'],
                  'AU1' : ['quality_code'],
                  'AT1' : ['source_element', 'quality_code']
                 }

# generate a dict of vars to keep
kept_vars_dict_base = {key: list(set(var_names_dict_base[key]) - set(quality_code_dict_base[key])) for key in var_names_dict_base}

# dict of values to impute for numerical vars  
num_impute_dict = {'WND_WTHR_direction_angle': 1, 'WND_WTHR_direction_angle_origin': 1, 'WND_WTHR_direction_angle_dest': 1,
               'WND_WTHR_speed_rate': 0, 'WND_WTHR_speed_rate_origin': 0, 'WND_WTHR_speed_rate_dest': 0,
               'CIG_WTHR_ceiling_height_dimension': 0, 'CIG_WTHR_ceiling_height_dimension_origin': 0, 'CIG_WTHR_ceiling_height_dimension_dest': 0,
               'VIS_WTHR_distance_dimension': 0, 'VIS_WTHR_distance_dimension_origin': 0, 'VIS_WTHR_distance_dimension_dest': 0,
               'TMP_WTHR_air_temperature': 0, 'TMP_WTHR_air_temperature_origin': 0, 'TMP_WTHR_air_temperature_dest': 0,
               'DEW_WTHR_dew_point_temperature': 0, 'DEW_WTHR_dew_point_temperature_origin': 0, 'DEW_WTHR_dew_point_temperature_dest': 0,
               'SLP_WTHR_sea_level_pressure': 860, 'SLP_WTHR_sea_level_pressure_origin': 860, 'SLP_WTHR_sea_level_pressure_dest': 860,
               'GA1_WTHR_base_height_dimension': 0, 'GA1_WTHR_base_height_dimension_origin': 0, 'GA1_WTHR_base_height_dimension_dest': 0,
               'GF1_WTHR_lowest_cloud_base_height_dimension': 6000, 'GF1_WTHR_lowest_cloud_base_height_dimension_origin': 6000, 'GF1_WTHR_lowest_cloud_base_height_dimension_dest': 6000,
               'MA1_WTHR_altimeter_setting_rate': 10132.5, 'MA1_WTHR_altimeter_setting_rate_origin': 10132.5, 'MA1_WTHR_altimeter_setting_rate_dest': 10132.5,
               'MA1_WTHR_station_pressure_rate': 10132.5, 'MA1_WTHR_station_pressure_rate_origin': 10132.5, 'MA1_WTHR_station_pressure_rate_dest': 10132.5,
               'AA1_WTHR_period_quantity_in_hours': 0, 'AA1_WTHR_period_quantity_in_hours_origin': 0, 'AA1_WTHR_period_quantity_in_hours_dest': 0,
               'AA1_WTHR_depth_dimension': 0, 'AA1_WTHR_depth_dimension_origin': 0, 'AA1_WTHR_depth_dimension_dest': 0,
               'AA2_WTHR_period_quantity_in_hours': 0, 'AA2_WTHR_period_quantity_in_hours_origin': 0, 'AA2_WTHR_period_quantity_in_hours_dest': 0,
               'AA2_WTHR_depth_dimension': 0, 'AA2_WTHR_depth_dimension_origin': 0, 'AA2_WTHR_depth_dimension_dest': 0,
               'AJ1_WTHR_dimension': 0, 'AJ1_WTHR_dimension_origin': 0, 'AJ1_WTHR_dimension_dest': 0,
               'AJ1_WTHR_equivalent_water_depth_dimension': 0, 'AJ1_WTHR_equivalent_water_depth_dimension_origin': 0, 'AJ1_WTHR_equivalent_water_depth_dimension_dest': 0,
               'AL1_WTHR_period_quantity': 0, 'AL1_WTHR_period_quantity_origin': 0, 'AL1_WTHR_period_quantity_dest': 0,
               'AL1_WTHR_depth_dimension': 0, 'AL1_WTHR_depth_dimension_origin': 0, 'AL1_WTHR_depth_dimension_dest': 0,
               'AN1_WTHR_period_quantity': 1, 'AN1_WTHR_period_quantity_origin': 1, 'AN1_WTHR_period_quantity_dest': 1,
               'AN1_WTHR_depth_dimension': 0, 'AN1_WTHR_depth_dimension_origin': 0, 'AN1_WTHR_depth_dimension_dest': 0,
               'AO1_WTHR_period_quantity_in_minutes': 0, 'AO1_WTHR_period_quantity_in_minutes_origin': 0, 'AO1_WTHR_period_quantity_in_minutes_dest': 0,
               'AO1_WTHR_depth_dimension': 0, 'AO1_WTHR_depth_dimension_origin': 0, 'AO1_WTHR_depth_dimension_dest': 0}

# list of values to impute for categorical vars  
cat_impute_list = ['WND_WTHR_type_code', 'WND_WTHR_type_code_origin', 'WND_WTHR_type_code_dest',
                   'CIG_WTHR_ceiling_determination_code', 'CIG_WTHR_ceiling_determination_code_origin', 'CIG_WTHR_ceiling_determination_code_dest',
                   'CIG_WTHR_CAVOK_code', 'CIG_WTHR_CAVOK_code_origin', 'CIG_WTHR_CAVOK_code_dest', 
                   'AA1_WTHR_condition_code', 'AA1_WTHR_condition_code_origin', 'AA1_WTHR_condition_code_dest',
                   'AA2_WTHR_condition_code', 'AA2_WTHR_condition_code_origin', 'AA2_WTHR_condition_code_dest',
                   'AJ1_WTHR_condition_code', 'AJ1_WTHR_condition_code_origin', 'AJ1_WTHR_condition_code_dest',
                   'AL1_WTHR_condition_code', 'AL1_WTHR_condition_code_origin', 'AL1_WTHR_condition_code_dest',
                   'AN1_WTHR_condition_code', 'AN1_WTHR_condition_code_origin', 'AN1_WTHR_condition_code_dest',
                   'AU1_WTHR_intensity_and_proximity_code', 'AU1_WTHR_intensity_and_proximity_code_origin', 'AU1_WTHR_intensity_and_proximity_code_dest',
                   'AU1_WTHR_descriptor_code', 'AU1_WTHR_descriptor_code_origin', 'AU1_WTHR_descriptor_code_dest',
                   'AU1_WTHR_precipitation_code', 'AU1_WTHR_precipitation_code_origin', 'AU1_WTHR_precipitation_code_dest',
                   'AU1_WTHR_obscuration_code', 'AU1_WTHR_obscuration_code_origin', 'AU1_WTHR_obscuration_code_dest',
                   'AU1_WTHR_other_weather_phenomena_code', 'AU1_WTHR_other_weather_phenomena_code_origin', 'AU1_WTHR_other_weather_phenomena_code_dest',
                   'AU1_WTHR_combination_indicator_code', 'AU1_WTHR_combination_indicator_code_origin', 'AU1_WTHR_combination_indicator_code_dest',
                   'AT1_WTHR_weather_type', 'AT1_WTHR_weather_type_origin', 'AT1_WTHR_weather_type_dest',
                   'AT1_WTHR_weather_type_abbreviation', 'AT1_WTHR_weather_type_abbreviation_origin', 'AT1_WTHR_weather_type_abbreviation_dest',
                    'GA1_WTHR_coverage_code', 'GA1_WTHR_coverage_code_origin', 'GA1_WTHR_coverage_code_dest',
                    'GA1_WTHR_cloud_type_code', 'GA1_WTHR_cloud_type_code_origin', 'GA1_WTHR_cloud_type_code_dest',
                    'GF1_WTHR_total_coverage_code', 'GF1_WTHR_total_coverage_code_origin', 'GF1_WTHR_total_coverage_code_dest',
                    'GF1_WTHR_total_lowest_cloud_cover_code', 'GF1_WTHR_total_lowest_cloud_cover_code_origin', 'GF1_WTHR_total_lowest_cloud_cover_code_dest']

## Weather Helper Functions and Preprocessing
1. `append_WTHR`: fixes var naming in dicts to match our join naming (e.g. 'WND' --> 'WND_WTHR') for downstream things
2. `append_feature_name`: preps some names for later when they're used to replace other names. appends the supervariable name to the subvariable so we will know where it came from (e.g. 'direction_angle' --> 'WND_WTHR_direction_angle')
3. `list_weather_vars`: takes all elements of each (1) list within dict value and adds to a bigger list. used to generate lists of which weather subvariables to drop and which ones should be kept.

In [0]:
# function because I named dict variables wrong - appends "_WTHR" to the end
def append_WTHR(dictionary):
    """Appends "_WTHR" to the input dict key (e.g. 'WND' --> 'WND_WTHR')
    Input: a dictionary
    Output: a dictionary with keys renamed to str(key)+"_WTHR"
    """
    weather_dict = dictionary
    for old_key in list(weather_dict.keys()):
        new_key = str(old_key) + '_WTHR'
        weather_dict[new_key] = weather_dict.pop(old_key)
    return weather_dict

# fixes the names
# dict base changes too 
var_names_dict = append_WTHR(var_names_dict_base)
num_subfeatures_dict = append_WTHR(num_subfeatures_dict_base)
missing_value_code_dict = append_WTHR(missing_value_code_dict_base)
type_code_dict = append_WTHR(type_code_dict_base)
scale_factor_dict = append_WTHR(scale_factor_dict_base)
quality_code_dict = append_WTHR(quality_code_dict_base)
kept_vars_dict = append_WTHR(kept_vars_dict_base)


In [0]:
# function to append feature name in front of subfeature column names for var_names_dict
def append_feature_name(dictionary):
    """ Appends the name of the key (Weather Dataset var name) in front of
    each item in its respective list of values
    e.g. 'direction_angle' --> 'WND_WTHR_direction_angle'
  
    Input: a dictionary where values are ideally a list of strings
    Output: a dictionary with each element of values list (in this case the 
    list of subvariables) appended with the key in front
    """
    input_dict = dictionary
    var_names_dict = {}
    for i in list(input_dict.keys()):
        subfeature_list = []
        for j in input_dict[str(i)]:
            j = str(i) + "_" + str(j)
            subfeature_list.append(j)
        var_names_dict[i] = subfeature_list
    return var_names_dict

# append feature name to subfeature column names
var_names_dict = append_feature_name(var_names_dict)
quality_code_dict = append_feature_name(quality_code_dict)
kept_vars_dict = append_feature_name(kept_vars_dict)

In [0]:
# function to generate list of all potential weather var names

def list_weather_vars(dict):
    """Used for making a list of all weather variables names from a dict
    (e.g. make a big list of the ones to drop)
  
    Input: dictionary where the format is key: value and value is a list of 
    strings (vs. a list of lists)
    Output: a set of all of the values contained in the dict value lists 
    """
    dict = dict
    list = []
    for i in dict:
        for j in dict[i]:
            list.append(j)
      
    return list

# generate list of all post-split weather variable quality codes (minus '_origin' and/or '_dest')
names_to_drop = set(list_weather_vars(quality_code_dict))
kept_vars = set(list_weather_vars(kept_vars_dict))
all_var_names = set(list_weather_vars(var_names_dict))

# Weather Cleaning and Imputation
1. `generate_columns`: splits weather var columns (i.e. 'WND' is split into its 5 subvariables)
2. `drop_qc_columns`: drops parent columns and quality code columns. also drops some categorical columns that contain 100% nulls
3. `impute_weather_num`: imputes values (double type) for numerical weather variables
4. `make_binary_cat_cols`: turns a categorical weather variable column into several binary columns (int type) depending on how many categories there are; drops the parent categorical column

## 1. generate_columns

In [0]:
def generate_columns(df):
    """
    Takes in a df, iterates through weather variable consisting of a few
    subvariables (e.g. a 'WND' can look like '190,1,N,0015,1' and that's 5
    subvars) and adds a column for each subvariable 
    
    Input: dataframe
    Output: dataframe with addition of weather subvariables as split columns (and dropping the supercolumns that were split)
    """
  
    df = df

    for i in df.columns:
        col_name = i
        
        try:
            # save origin or destination flag if it exists
            origin_dest = '_' + str(col_name).split('_')[-1].lower()

            if origin_dest.lower() == '_dest' or origin_dest.lower() == '_origin':
            # generate column name without origin and destination flag
                split_location = len(origin_dest) * -1
                temp_col_name = col_name[:split_location]
            else:
                temp_col_name = col_name

            split_col = F.split(df[col_name], ',')
        except IndexError:
            continue
    
        try:

            for j in range(0,num_subfeatures_dict[temp_col_name]):
                if origin_dest.lower() == '_dest' or origin_dest.lower() == '_origin':
                  # attach origin or destination flag back if it existed
                    new_col_name = str(var_names_dict[temp_col_name][j]) + str(origin_dest)
                else:
                    new_col_name = str(var_names_dict[temp_col_name][j])

                # make the new column and reassign

                try:
                    # generate new column & reassign based on variable type
                    if type_code_dict[temp_col_name][j] == 'float':
                        # casting 'floats' as doubles currently
                        if int(scale_factor_dict[temp_col_name][j]) == 1.00:
                            df = df.withColumn(new_col_name, F.when(split_col.getItem(j)==missing_value_code_dict[temp_col_name][j], F.lit(None)).otherwise(split_col.getItem(j).cast(types.DoubleType())))
                        else:
                            df = df.withColumn(new_col_name, F.when(split_col.getItem(j)==missing_value_code_dict[temp_col_name][j], F.lit(None)).otherwise(split_col.getItem(j).cast(types.DoubleType())/scale_factor_dict[temp_col_name][j]))                
                    elif type_code_dict[temp_col_name][j] == 'str':
                        df = df.withColumn(new_col_name,
                                         F.when(split_col.getItem(j)==missing_value_code_dict[temp_col_name][j], F.lit(None)).otherwise(split_col.getItem(j)).cast(types.StringType()))
                    elif type_code_dict[temp_col_name][j] == 'int':
                        df = df.withColumn(new_col_name,
                                 F.when(split_col.getItem(j)==missing_value_code_dict[temp_col_name][j], F.lit(None)).otherwise(split_col.getItem(j)).cast(types.IntegerType()))
                except KeyError:
                    continue
        
        except KeyError:
              continue  
    return df

## 2. drop_qc_columns

The following are all nulls, so dropped them in addition to dropping quality codes:<br>
VIS_WTHR_variability_code <br>
AJ1_WTHR_equivalent_water_condition_code <br>
GF1_WTHR_total_opaque_coverage_code<br>
GF1_WTHR_low_cloud_genus_code<br>
GF1_WTHR_mid_cloud_genus_code<br>
GF1_WTHR_high_cloud_genus_code

In [0]:
def drop_qc_columns(df):
    """ Drops columns that have to do with quality codes. Also drops some
    catgorical weather variable columns we know to have 100% null values.
  
    Input: dataframe with weather variables already split
    Output: dataframe with fewer columns
  
    """
    df = df

    # iterate over all vars and slice the part before 2nd underscore
    for i in df.columns:
        try:
            col_name = i
            first_segment = i.split("_")[0] + "_" + i.split("_")[1]
        except IndexError:
            continue
    
        # if the slice matches a weather variable, then save origin/destination flag
        # and split the name
        if str(first_segment) in list(var_names_dict_base.keys()):
            origin_dest = '_' + str(col_name).split('_')[-1].lower()
      
            # check for origin dest flag
            if origin_dest.lower() == '_dest' or origin_dest.lower() == '_origin':
                split_location = len(origin_dest) * -1
                temp_col_name = col_name[:split_location]
            else:
                temp_col_name = col_name
      
            # drop vars not in the list of vars to keep
            if temp_col_name not in list(kept_vars):
                df = df.drop(col_name)
            else:
                continue
      
        else:
            continue
      
    return df

## 3. impute_weather_num

In [0]:
def impute_weather_num(df):
    """For certain numerical weather variables, imputes values where
    there are nulls.
  
    Note: KeyError print statement doesn't mean something went wrong.
    Variables throwing a KeyError are categorical and not numerical.
  
    Input: dataframe
    Output: dataframe with numerical values imputed for numerical vars
  
    """
    df = df
    temp_list_full_colname = []

    # iterate over all vars and slice the part before 2nd underscore
    for i in df.columns:
        col_name = i
        try:
            first_segment = i.split("_")[0] + "_" + i.split("_")[1]
        except IndexError:
            continue
    
        # if the slice matches a weather variable, then save origin/destination flag
        # and split the name
        if str(first_segment) in list(kept_vars_dict_base.keys()):
            origin_dest = '_' + str(col_name).split('_')[-1].lower()
      
            # check for origin dest flag
            if origin_dest.lower() == '_dest' or origin_dest.lower() == '_origin':
                split_location = len(origin_dest) * -1
                temp_col_name = col_name[:split_location]
            else:
                temp_col_name = col_name
      
        # adds the column name to a list to be imputed
        temp_list_full_colname.append(str(col_name))
        num_impute_list = list(set(temp_list_full_colname))
  
    # imputes the correct value for each variable
    enumerated_num_impute_list = enumerate(num_impute_list)
    for pair in enumerated_num_impute_list:
        try:
            temp_dict = {pair[1]: num_impute_dict[pair[1]]}
            df = df.na.fill(temp_dict)
        except KeyError:
            print("KeyError with {} (categorical var)".format(pair[1]))

    return df

## 4. make_binary_cat_cols

In [0]:
def make_binary_cat_cols(df):
    """Takes a dataframe and makes certain categorical weather variables into
    binary variables (adds one column per category).
  
    Input: dataframe with columns containing categorical weather data
    Output: dataframe with binary columns in place of the categorical ones
  
    """
    df = df
    drop_list = []
        
    for i in df.columns:
        if i in cat_impute_list:
            col_name = i
            origin_dest = '_' + str(col_name).split('_')[-1].lower()
            drop_list.append(i)
      
            # get the general col name if there's an origin or dest tag at the end
            if origin_dest.lower() == '_dest' or origin_dest.lower() == '_origin':
                split_location = len(origin_dest) * -1
                temp_col_name = col_name[:split_location]
            else:
                temp_col_name = col_name
      
            # depending on which column name it is make new binary columns based on possible
            # categorical values. hardcoded based on which categorical codes show up per variable (see Categorical variables header in this notebook)
            if temp_col_name == 'WND_WTHR_type_code':
                # hardcode possible codes that are not nulls or empty
                codes = ['V', 'C', 'N', 'R', 'H']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    # generate a new name for the binary column and cast 1 if it's the value and 0 if not
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'CIG_WTHR_ceiling_determination_code':
                codes = ['M', 'C', 'W']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'CIG_WTHR_CAVOK_code':
                codes = ['Y', 'N']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AA1_WTHR_condition_code':
                codes = ['3', '1', '2']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AA2_WTHR_condition_code':
                codes = ['M', 'C', 'W']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AJ1_WTHR_condition_code':
                codes = ['3', '1']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AL1_WTHR_condition_code':
                codes = ['3', '1']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AN1_WTHR_condition_code':
                codes = ['3']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_intensity_and_proximity_code':
                codes = ['0', '1', '2', '3', '4']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_descriptor_code':
                codes = ['0', '1', '2', '3', '4', '5', '6', '7', '8']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_precipitation_code':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_obscuration_code':
                codes = ['0', '1', '2', '3', '4', '5', '6', '7']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_other_weather_phenomena_code':
                codes = ['0', '1', '2', '3', '4', '5']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AU1_WTHR_combination_indicator_code':
                codes = ['1', '2', '3']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AT1_WTHR_weather_type':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10',
                        '13', '14', '15', '16', '17', '18', '19', '21', '22']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'AT1_WTHR_weather_type_abbreviation':
                codes = ['FG', 'BR', 'MIFG', 'FC', 'SN', 'DZ', 'RA', 'FZFG', 'TS', 'UP', 'FZDZ', 'BLSN', 'HZ', 'FZRA', 'GR', 'PL', 'DU', 'FG+']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'GA1_WTHR_coverage_code':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'GA1_WTHR_cloud_type_code':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '12', '15']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'GF1_WTHR_total_coverage_code':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
            elif temp_col_name == 'GF1_WTHR_total_lowest_cloud_cover_code':
                codes = ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10']
                for code in codes:
                    print("currently on code {} in {}".format(code, temp_col_name))
                    new_col_name = col_name + '-' + code
                    df = df.withColumn(new_col_name, F.when(df[col_name]==code, F.lit(1)).otherwise(F.lit(0)).cast(types.IntegerType()))
                    print("-----generated column", new_col_name)
  
        else:
            continue
  
    df2 = df.drop(*drop_list)
  
    return df2    

d
# Derive Flight Features
1. `flight_derived_features_creation`

## 1. flight_derived_features_creation

This includes: 

FEATURE 1: **LOCAL_DEP_HOUR** (Extraction of Local Departure Hour) 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;0 - 23

FEATURE 2: **HOLIDAY** (Holiday Indicator)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**0**: Not a holiday 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**1**: Is a holiday 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**2**: Near holiday


FEATURE 3: **Previous_Flight_Delay_15** (If the previous flight has delayed for 15 mins or more)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**0**: No Delay / Delay less than 15 minutes

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**1**: Delay for 15 minutes or more

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**2**: Lack of information (The scheduled departure time of previous flight is less than two hours before the scheduled departure time of current flight)


FEATURE 4: **Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep** (If there is enough time (40 minutes) between estimate arrival time of previous flight and planned departure time of current flight)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**0**: No Enough time 

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**1**: Have Enough time

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**2**: Lack of information (The scheduled departure time of previous flight is less than two hours before the scheduled departure time of current flight)


FEATURE 5: **Poor_Schedule** (If planned arrival time of Pre-Flight is later than planned departure time of Current-Flight)

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**0**: Not a poor scheduling

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**1**: Poor scheduling

In [0]:
def flight_derived_features_creation(df):
    # FEATURE 1: Extraction of Local Departure Hour
    df = df.withColumn("FORMATTED_CRS_DEP_TIME", F.substring(F.format_string("0000%d", "CRS_DEP_TIME_AIRLNS"), -4, 4))
    df = df.withColumn("DATE_WITH_CRS_DEP_TIME", F.concat_ws(" ", df.FL_DATE_AIRLNS, df.FORMATTED_CRS_DEP_TIME))
    df = df.withColumn("LOCAL_DEP_HOUR", F.hour(F.to_timestamp(df.DATE_WITH_CRS_DEP_TIME, "yyyy-MM-dd HHmm")))
    columns_to_drop = ['FORMATTED_CRS_DEP_TIME', 'DATE_WITH_CRS_DEP_TIME']
    df = df.drop(*columns_to_drop)


    # FEATURE 2: Create Holiday Indicator
    df = df.withColumn("HOLIDAY", F.expr("""CASE WHEN FL_DATE_AIRLNS in (
                              '2015-01-01', '2015-07-03', '2015-07-04', '2015-11-26', '2015-12-25',
                              '2016-01-01', '2016-07-04', '2016-11-24', '2016-12-25', '2016-12-26', 
                              '2017-01-01', '2017-01-02', '2017-07-04', '2017-11-23', '2017-12-25',
                              '2018-01-01', '2018-07-04', '2018-11-22', '2018-12-25', 
                              '2019-01-01', '2019-07-04', '2019-11-28', '2019-12-25') THEN 1 """ + 
         """ WHEN FL_DATE_AIRLNS in (
                              '2015-01-02', '2015-01-03','2015-01-04', '2015-01-05', '2015-01-06', '2015-07-01', '2015-07-02', '2015-07-05', '2015-07-06', '2015-11-21', '2015-11-22', '2015-11-23', '2015-11-24', '2015-11-25', '2015-11-27', '2015-11-28', '2015-11-29', '2015-11-30', '2015-12-01', '2015-12-20', '2015-12-21', '2015-12-22', '2015-12-23', '2015-12-24', '2015-12-26', '2015-12-27','2015-12-28', '2015-12-29', '2015-12-30', '2015-12-31',

                              '2016-01-02', '2016-01-03','2016-01-04', '2016-01-05', '2016-01-06', '2016-07-02', '2016-07-03', '2016-07-05', '2016-07-06', '2016-11-19', '2016-11-20', '2016-11-21', '2016-11-22', '2016-11-23', '2016-11-25', '2016-11-26', '2016-11-27', '2016-11-28', '2016-11-29', '2016-12-20', '2016-12-21', '2016-12-22', '2016-12-23', '2016-12-24', '2016-12-27', '2016-12-28','2016-12-29', '2016-12-30', '2016-12-31',

                              '2017-01-03', '2017-01-04',  '2017-01-05', '2017-01-06', '2017-01-07', '2017-07-02', '2017-07-03', '2017-07-05', '2017-07-06','2017-11-18', '2017-11-19', '2017-11-20', '2017-11-21', '2017-11-22', '2017-11-24', '2017-11-25','2017-11-26', '2017-11-27', '2017-11-28', '2017-12-20', '2017-12-21', '2017-12-22', '2017-12-23', '2017-12-24', '2017-12-26', '2017-12-27', '2017-12-28', '2017-12-29', '2017-12-30', '2017-12-31', 

                              '2018-01-02', '2018-01-03', '2018-01-04', '2018-01-05', '2018-01-06', '2018-07-02', '2018-07-03', '2018-07-05', '2018-07-06','2018-11-17','2018-11-18', '2018-11-19', '2018-11-20', '2018-11-21', '2018-11-23', '2018-11-24','2018-11-25', '2018-11-26', '2018-11-27','2018-12-20','2018-12-21', '2018-12-22', '2018-12-23', '2018-12-24', '2018-12-26', '2018-12-27','2018-12-28', '2018-12-29','2018-12-30', '2018-12-31', 

                              '2019-01-02', '2019-01-03','2019-01-04', '2019-01-05', '2019-01-06', '2019-07-02', '2019-07-03', '2019-07-05', '2019-07-06', '2019-11-23', '2019-11-24', '2019-11-25', '2019-11-26', '2019-11-27', '2019-11-29', '2019-11-30','2019-12-01', '2019-12-02', '2019-12-03', '2019-12-20', '2019-12-21', '2019-12-22','2019-12-23', '2019-12-24', '2019-12-26', '2019-12-27','2019-12-28', '2019-12-29','2019-12-30', '2019-12-31') THEN 2 """
                              "ELSE 0 END"))

    # FEATURE 3: Create Previous Flight Delay Indicator
    # FEATURE 4: Calculate the estimate arrival time for the previous flight, 
    #            Get the time between previous flight estimate arrival time and CRS departure time for the current flight
    # If time btwn estimate_arrival and crs_dep >= 40 minutes -------> 1, 
    #                                            < 40 minutes -------> 0, 
    #                          otherwise(lack of information) -------> 2   notes: The scheduled departure time of previous flight is less than two hours 
    #                                                                             before the scheduled departure time of current flight
    # FEATURE 5: Create Poor_Schedule Indicator

    utc_arrive_for_each_tail = Window.partitionBy('TAIL_NUM_AIRLNS').orderBy('utc_arrive')

    # Two_Hour_Btwn_Prev_Departure_and_Current_Departure ----> 
    # 1: Time > 2 hours    0: Within 2 hours.(The scheduled departure time of previous flight is less than two hours 
    #                                         before the scheduled departure time of current flight)
    utc_arrive_for_each_tail = Window.partitionBy('TAIL_NUM_AIRLNS').orderBy('utc_arrive')
    df = df.withColumn('Prev_Flight_Planned_Departure_UTC', F.lag('utc_dep', 1).over(utc_arrive_for_each_tail))\
         .withColumn('Time_Btwn_Prev_Departure_and_Current_Departure', (F.unix_timestamp('utc_dep') - F.unix_timestamp('Prev_Flight_Planned_Departure_UTC')) /60 /60 ) \
         .withColumn('Two_Hour_Btwn_Prev_Departure_and_Current_Departure', F.expr("CASE WHEN Time_Btwn_Prev_Departure_and_Current_Departure > 2 THEN '1'" + "ELSE '0' END"))

    df = df.withColumn('Prev_Flight_Delay_15', F.lag('DEP_DEL15_AIRLNS', 1).over(utc_arrive_for_each_tail))\
         .withColumn('Prev_Flight_Delay', F.lag('DEP_DELAY_NEW_AIRLNS', 1).over(utc_arrive_for_each_tail))\
         .withColumn('Prev_Flight_Planned_Arrive_UTC', F.lag('utc_arrive', 1).over(utc_arrive_for_each_tail))

    df = df.withColumn('Estimate_Pre_Flight_Arrival_Time', F.col("Prev_Flight_Planned_Arrive_UTC") +  F.col("Prev_Flight_Delay") * F.expr("Interval 1 Minutes"))

    # To get how many minutes between Estimate_Pre_Flight_Arrival_Time and utc_dep for the current flight
    # And if there is enough time between Estimate_Pre_Flight_Arrival_Time and utc_dep for the current flight (40 minutes)
    df = df.withColumn('Time_Btwn_Estimate_Arrival_and_Planned_Dep', (F.unix_timestamp('utc_dep') - F.unix_timestamp('Estimate_Pre_Flight_Arrival_Time')) / 60)\
         .withColumn('Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep', F.expr("CASE WHEN Time_Btwn_Estimate_Arrival_and_Planned_Dep >= 40 THEN 1 ELSE 0 END"))

    # Create Feature 'Poor_Schedule' if planned arrival time is later than planned departure time
    df = df.withColumn('time_btwn_dep_and_plannedArrival', (F.unix_timestamp('utc_dep') - F.unix_timestamp('Prev_Flight_Planned_Arrive_UTC'))/60) \
         .withColumn('Poor_Schedule', F.expr("CASE WHEN time_btwn_dep_and_plannedArrival <= 0 THEN 1 ELSE 0 END"))

    # Update df:
    # mark Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep = 2 for flights whose previous flight depart within 2 hours(Two_Hour_Btwn_Prev_Departure_and_Current_Departure == 0)
    # mark Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep = 1 for flights that do not have previous flights
    # mark Prev_Flight_Delay_15 = 0 for flights that do not have previous flights
    # mark Poor_Schedule = 0 for flights that do not have previous flights


    # Do have previous flight & previous flight depart less than 2 hours before the departure time of current flight, 
    # Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep = 2(lack of information, we don't know if there is enough time)
    # Prev_Flight_Delay_15 = 2(lack of information, we don't know if the previous flight delay or not)
    df = df.withColumn('Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep', F.when((df.Prev_Flight_Planned_Arrive_UTC.isNotNull()) & (df.Two_Hour_Btwn_Prev_Departure_and_Current_Departure == 0) , 2).otherwise(df.Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep))
    df = df.withColumn('Prev_Flight_Delay_15', F.when((df.Prev_Flight_Planned_Arrive_UTC.isNotNull()) & (df.Two_Hour_Btwn_Prev_Departure_and_Current_Departure == 0) , 2).otherwise(df.Prev_Flight_Delay_15.cast("Integer")))

    # mark Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep = 1 for flights that do not have previous flights
    # mark Prev_Flight_Delay_15 = 0 for flights that do not have previous flights
    # mark Poor_Schedule = 0 for flights that do not have previous flights
    df = df.withColumn('Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep', F.when(df.Prev_Flight_Planned_Arrive_UTC.isNull(), 1).otherwise(df.Enough_Time_Btwn_Estimate_Arrival_and_Planned_Dep))\
         .withColumn('Prev_Flight_Delay_15', F.when(df.Prev_Flight_Planned_Arrive_UTC.isNull(), 0).otherwise(df.Prev_Flight_Delay_15.cast("Integer")))\
         .withColumn('Poor_Schedule', F.when(df.Prev_Flight_Planned_Arrive_UTC.isNull(), 0).otherwise(df.Poor_Schedule.cast("Integer")))

    # Drop no-longer needed columns
    columns_to_drop = ['Time_Btwn_Estimate_Arrival_and_Planned_Dep', 'Estimate_Pre_Flight_Arrival_Time', 'Prev_Flight_Planned_Arrive_UTC', 
                     'Prev_Flight_Delay', 'Prev_Flight_Planned_Departure_UTC', 'Time_Btwn_Prev_Departure_and_Current_Departure', 'Two_Hour_Btwn_Prev_Departure_and_Current_Departure', 'time_btwn_dep_and_plannedArrival']
    df = df.drop(*columns_to_drop)

    return df


d
# CV and Sampline Functions
1. `make_cv_folds`: Take the entire data frame and split it into 5 folds of (train, set) dataframe tuples for rolling window based cross validation
2. `undersample`: Take a dataframe of training data and undersamples the majority class to be about the same size as the minority class
3. `oversample`: Take a dataframe of training data and oversamples (with replacement) the minority class to be about the same size as the majority class

## 1. make_cv_folds

In [0]:
def make_cv_folds(data):
    fold1_train = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2015-01-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2017-10-01'))
    fold1_test = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2017-10-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-01-01'))

    fold2_train = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2015-04-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-01-01'))
    fold2_test = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2018-01-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-04-01'))

    fold3_train = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2015-07-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-04-01'))
    fold3_test = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2018-04-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-07-01'))

    fold4_train = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2015-10-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-07-01'))
    fold4_test = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2018-07-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-10-01'))

    fold5_train = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2016-01-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2018-10-01'))
    fold5_test = data.filter((F.to_timestamp(F.col('FL_DATE_AIRLNS')) >= '2018-10-01') & (F.to_timestamp(F.col('FL_DATE_AIRLNS')) < '2019-01-01'))

    return [(fold1_train, fold1_test), (fold2_train, fold2_test), (fold3_train, fold3_test), (fold4_train, fold4_test), (fold5_train, fold5_test)]

  

## 2. undersample

In [0]:
def undersample(data, label_col='DEP_DEL15_AIRLNS'):
    delayed = data.filter(F.col(label_col) > 0)
    not_delayed = data.filter(F.col(label_col) == 0)

    delayed_count = delayed.count()
    not_delayed_count = not_delayed.count()

    sample_fraction = delayed_count * 1.0 / not_delayed_count

    sample_not_delayed = not_delayed.sample(fraction=sample_fraction, seed=1)

    return sample_not_delayed.union(delayed)
  

## 3. oversample

In [0]:
def oversample(data, over_sample_ratio=None, label_col='DEP_DEL15_AIRLNS'):
    delayed = data.filter(F.col(label_col) > 0)
    not_delayed = data.filter(F.col(label_col) == 0)

    delayed_count = delayed.count()
    not_delayed_count = not_delayed.count()

    if not over_sample_ratio:
        sample_count = not_delayed_count
    else:
        sample_count = delayed_count * over_sample_ratio
    
    not_delayed_sample_fraction = sample_count * 1.0 / not_delayed_count
    delayed_sample_fraction = sample_count * 1.0 / delayed_count

    sample_not_delayed = not_delayed.sample(fraction=not_delayed_sample_fraction, seed=1)
    sample_delayed = delayed.sample(withReplacement=True, fraction=delayed_sample_fraction, seed=1)

    return sample_delayed.union(sample_not_delayed)