In [0]:
from pyspark.ml.evaluation import Evaluator

# Import libraries for running the helper functions
from pyspark.sql.functions import min, max, col, expr, when, udf, create_map, lit, explode, array
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, unix_timestamp, isnan, count, concat, desc
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, DoubleType, StringType
from operator import itemgetter
from pyspark import SparkFiles
import os
from timezonefinder import TimezoneFinder
import pytz
from datetime import datetime, date, timedelta
import numpy as np
import sys
from graphframes import *

# Additional imports for the tree pipeline
from sparkdl.xgboost import XgboostClassifier
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Additional imports for displaying and plotting results
import time
import pandas as pd
import seaborn as sns
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt
from matplotlib import gridspec
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

In [0]:
def add_airport_icao_coords(us_airport_locs, airlines):
    """Add the coordinates, timezones, and ICAO codes for the origin and destination airports"""
    
    # Alias 
    df1_a = airlines.alias("df1_a")
    df2_a = us_airport_locs.alias("df2_a")

    # Add the latitude and longitude coordinates to the 3 mo dataset for the origin airports
    airlines_2 = df1_a.join(df2_a, df1_a.ORIGIN == df2_a.IATA, 'inner')\
                      .withColumnRenamed('ICAO', 'origin_ICAO')\
                      .withColumnRenamed('IATA', 'origin_IATA')\
                      .withColumnRenamed('latitude','origin_latitude')\
                      .withColumnRenamed('longitude','origin_longitude')\
                      .withColumnRenamed('tz_database_timezone', 'origin_local_timezone')
  
    # Realias
    df3_a = airlines_2.alias("df3_a")
  
    # Add the latitude and longitude coordinates to the 3 mo dataset for the origin airports
    airlines_3 = df3_a.join(df2_a, df3_a.DEST == df2_a.IATA, 'inner')\
                      .withColumnRenamed('ICAO', 'destination_ICAO')\
                      .withColumnRenamed('IATA', 'destination_IATA')\
                      .withColumnRenamed('latitude', 'destination_latitude')\
                      .withColumnRenamed('longitude', 'destination_longitude')\
                      .withColumnRenamed('tz_database_timezone', 'destination_local_timezone')
  
    # Drop the IATA column added from the us_airport_locs dataset
    airlines_3 = airlines_3.drop('origin_IATA', 'destination_IATA')
  
    return airlines_3

def nearest_origin_station(airlines, stations):
    """Add the closest weather station to each origin airport to the flights table"""
    
    # Alias the tables in preparation for the join
    df1_a = airlines.alias("df1_a")
    df2_a = stations.alias("df2_a")
   
    # Join the closest stations features to the airlines dataframe based upon the origin airport
    # Use an inner join to drop flight observations that do not have a closest origin weather station (since we are interested in the effect of weather upon flight outcomes)
    airlines_2 = df1_a.join(df2_a, df1_a.origin_ICAO == df2_a.neighbor_call, 'inner')
    airlines_2 = airlines_2.withColumnRenamed('station_id', 'origin_weather_station')
    airlines_2 = airlines_2.drop('neighbor_call', 'origin_ICAO', 'distance_to_neighbor')
    return airlines_2

def nearest_destination_station(airlines, stations):
    """Add the closest weather station to each destination airport to the flights table"""
  
    # Rename the columns to avoid Spark error
    stations = stations.withColumnRenamed('neighbor_call', 'destination_call')
  
    # Alias 
    df3_a = airlines.alias("df3_a")
    df2_a = stations.alias("df2_a")
  
    # Join the closest stations features to the airlines dataframe based upon the destination airport
    # # Use an inner join to drop flight observations that do not have a closest destination weather station (since we are interested in the effect of weather upon flight outcomes)
    airlines_2 = df3_a.join(df2_a, df3_a.destination_ICAO == df2_a.destination_call, 'inner')
    airlines_2 = airlines_2.withColumnRenamed('station_id', 'destination_weather_station')
    airlines_2 = airlines_2.drop('destination_neighbor_call', 'destination_ICAO', 'distance_to_neighbor')
    return airlines_2

def add_cutoff_window(airlines):
    """Add columns representing the UTC timestamps of the cutoff window, for flight departure"""
    
    # Create a field representing the flight's scheduled departure in UTC time and add columns for the UTC timestamps 2 and 3 hours before scheduled departure
    airlines = airlines.withColumn('scheduled_departure', F.to_timestamp(F.concat(airlines.FL_DATE, F.lpad(airlines.CRS_DEP_TIME, 4, '0')), format='yyyy-MM-ddHHmm'))\
                       .withColumn('two_hrs_prior', col('scheduled_departure') + expr('INTERVAL -2 HOURS'))\
                       .withColumn('three_hrs_prior', col('scheduled_departure') + expr('INTERVAL -3 HOURS'))
  
    airlines = airlines.withColumn('utc_scheduled_departure', F.to_utc_timestamp(col('scheduled_departure'), col('origin_local_timezone')))\
                       .withColumn('utc_2hrs_before', F.to_utc_timestamp(col('two_hrs_prior'), col('origin_local_timezone')))\
                       .withColumn('utc_3hrs_before', F.to_utc_timestamp(col('three_hrs_prior'), col('origin_local_timezone')))\
                       .drop('scheduled_departure', 'two_hrs_prior', 'three_hrs_prior')
 
    return airlines

In [0]:
def parse_concatenated_fields(weather):
    """Select only the weather records of interest and create separate fields for the columns in the weather dataset containing concatenated fields"""
    
    # Extract only U.S. weather observations and select only the columns of interest
    weather = weather.filter(weather.NAME.contains('US')).select(weather.columns[0:16])
    
    # Split the concatenated fields
    weather_col = F.split(weather['WND'], ',')
    cig_col = F.split(weather['CIG'], ',')
    vis_col = F.split(weather['VIS'], ',')
    temp_col = F.split(weather['TMP'], ',')
    dew_col = F.split(weather['DEW'], ',')
    slp_col = F.split(weather['SLP'], ',')
  
    # Add the parsed, split fields as new columns and name
    # Drop the original/unparsed columns
    weather = weather.withColumn('wind_dir', weather_col.getItem(0).cast('int'))\
                     .withColumn('wind_dir_qual', weather_col.getItem(1).cast('int'))\
                     .withColumn('wind_obs_type', weather_col.getItem(2))\
                     .withColumn('wind_obs_speed', weather_col.getItem(3).cast('int'))\
                     .withColumn('wind_obs_speed_qual', weather_col.getItem(4).cast('int'))\
                     .withColumn('ceil_height', cig_col.getItem(0).cast('int'))\
                     .withColumn('ceil_qual', cig_col.getItem(1).cast('int'))\
                     .withColumn('ceil_ok', cig_col.getItem(3))\
                     .withColumn('vis_dist', vis_col.getItem(0).cast('int'))\
                     .withColumn('vis_dist_qual', vis_col.getItem(1).cast('int'))\
                     .withColumn('vis_dist_var', vis_col.getItem(2))\
                     .withColumn('vis_dist_var_qual', vis_col.getItem(3).cast('int'))\
                     .withColumn('air_temp', temp_col.getItem(0).cast('int'))\
                     .withColumn('air_temp_qual', temp_col.getItem(1))\
                     .withColumn('dew_pt_temp', dew_col.getItem(0).cast('int'))\
                     .withColumn('dew_pt_qual', dew_col.getItem(1))\
                     .withColumn('sea_level_p', slp_col.getItem(0).cast('int'))\
                     .withColumn('sea_level_p_qual', slp_col.getItem(1).cast('int'))\
                     .drop('WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP')
    
    # Drop rows with missing values in the fields of interest
    weather = weather.filter("wind_obs_type <> '9' and wind_obs_speed <> 9999 and ceil_height<>99999 and vis_dist<>999999 and air_temp<>'+9999' and dew_pt_temp<>'+9999'")\
                     .select("STATION","DATE","LATITUDE","LONGITUDE","ELEVATION","NAME","QUALITY_CONTROL","wind_obs_type","wind_obs_speed","ceil_height","vis_dist","air_temp","dew_pt_temp")

    return weather

In [0]:
def airport_importance(df, num_iter):
    '''
    use PageRank to determine airport importance
    inputs:
    * df: spark df, original
    * num_iter: integer for pagerank input, example uses num_iter = 5
    output:
    * df2: spark df, after adding pagerank_origin and pagerank_dest columns, ready for imbalance processing
    '''
    #Create vertices and edges with selected helper columns - ORIGIN, DEST, utc_scheduled_departure
    vertices = df.select(explode(array("ORIGIN", "DEST")).alias("id"))\
                        .dropDuplicates()
    edges = df.withColumn("id", F.concat(col("ORIGIN"), lit("_"), col("DEST"), lit("_"), col("utc_scheduled_departure")))\
                    .withColumn("outcomes", df["OUTCOME"].cast(IntegerType()))\
                    .selectExpr("id","ORIGIN as src","DEST as dst","DISTANCE","outcomes")
    #Create graph
    df_graph = GraphFrame(vertices, edges)
    
    #Measure the importance of airports by determining which vertices(airports) have the most edges (trips) with other vertices
    ranks = df_graph.pageRank(resetProbability=0.15, maxIter=num_iter)
    df_ranks = ranks.vertices
    
    #Append airport importance to feature selected table
    #df1 - append pagerank for origin first; df2 - append pagerank for destination
    df1 = df.join(df_ranks, df.ORIGIN == df_ranks.id).select(df["*"],df_ranks["pagerank"].alias("pagerank_origin"))
    df2 = df1.join(df_ranks,df1.DEST == df_ranks.id).select(df1["*"], df_ranks["pagerank"].alias("pagerank_dest"))
    
    #Drop ORIGIN, DEST, utc_scheduled_departure columns
    df2 = df2.drop("ORIGIN","DEST","utc_scheduled_departure")
    
    return df2

In [0]:
def upsample(df):
    """A function that upsamples a dataframe's minority class to create a balanced dataset"""
    
    # Get the dataframes corresponding to the majority and minority classes
    major_obs = df.filter(col('OUTCOME') == 0)
    minor_obs = df.filter(col('OUTCOME') == 1)
    
    # Get the fraction of all observations corresponding to the minor class
    minor_count = minor_obs.count()
    major_count = major_obs.count()
    minor_perc = minor_count/float(minor_count + major_count)
    
    # Oversample the minority class to balance the number
    oversampled_df = minor_obs.sample(withReplacement = True, fraction = minor_perc, seed = 42).union(major_obs)
    
    return oversampled_df

def downsample(df):
    """A function that downsamples a dataframe's majority class to create a balanced dataset"""
    
    # Get the dataframes corresponding to the majority and minority classes
    major_obs = df.filter(col('OUTCOME') == 0)
    minor_obs = df.filter(col('OUTCOME') == 1)
    
    # Find the ratio of major class to minor class observations
    ratio = int(major_obs.count()/minor_obs.count())
    # Downsample the majority class
    reduced_majority = major_obs.sample(False, 1/ratio, seed=42)
    # Create a new dataset, with fewer majority class observations
    reduced_df = reduced_majority.unionAll(minor_obs)
    
    return reduced_df

def smote_rdd(df):
    """Implement SMOTE using sci-kit learn nearest neighbor, RDD operations, and downsampling as necessary"""
    
    # Filter the dataframe for minority and majority class observations
    major_obs = df.filter(col('label') == 0)
    minor_obs = df.filter(col('label') == 1)
    
    # Get the count of major class observations
    major_counts = major_obs.count()
    minor_counts = minor_obs.count()

    # There cannot be more nearest neighbors than there are examples of the minority class
    try_k = int(major_counts/minor_counts)
    if try_k > minor_counts:
        k = minor_counts
    else:
        k = try_k
    
    # Select the features field from the dataframe of minority class observations and convert to an RDD
    featureMin = minor_obs.select('features').rdd
    # Extract the features and create a numpy array from them
    feature_val = featureMin.map(lambda x: x[0]).collect()
    feature = np.asarray(feature_val)
    
    # Create a nearest neighbors classifier and fit to the minority class features
    nearest_nbrs = neighbors.NearestNeighbors(n_neighbors=k).fit(feature)
    # Get the k-nearest neighbors for each row
    row_nbrs =  nearest_nbrs.kneighbors(feature)
    row_nbrs = row_nbrs[1]
    
    # Create a list to hold the new rows
    newRows = []
    
    for idx in range(len(feature_val)):
        for i in row_nbrs[idx]:
            newRec = feature_val[idx] + ((feature_val[idx] - feature_val[i])*random.random())
            newRows.insert(0,(newRec))
    # Create a new RDD from the new rows
    newData_rdd = sc.parallelize(newRows)
    # Create a dataframe from the RDD of the new rows
    newData_rdd_new = newData_rdd.map(lambda x: Row(features = x, label = 1))
    new_data = newData_rdd_new.toDF()
    # Join the synthetic features back to the minor observations
    new_data_minor = minor_obs.select('features', 'label').unionAll(new_data)
    
    # Find the count of records in the new dataframe
    minor_counts = new_data_minor.count()
    
    # Find the ratio of major class to synthetic + original minor class observations
    try:
        ratio = int(major_counts/minor_counts)
        
        # Select only the necessary columns from the major dataframe
        major_df = major_obs.select('features', 'label')
    
        # If the ratio is approximately 1, join the new dataframe of synthetic + original minority class observations to the major class observations
        if ratio <= 1:      
            smoted_df = major_df.unionAll(new_data_minor)
        else:
            # Downsample the majority class
            reduced_majority = major_df.sample(False, 1/ratio)
            # Create a new dataset, with fewer majority class observations
            smoted_df = reduced_majority.unionAll(new_data_minor)
    
        return smoted_df
    except ZeroDivisionError:
        return (f'There were no minority class observations in the sample')

In [0]:
def calculate_metrics(df):
    """A custom function to calculate recall, accuracy, and f1-score"""
    
    # Convert String to Integer Type
    df = df.withColumn('label',col('label').cast(IntegerType()))\
           .withColumn('prediction', col('prediction').cast(IntegerType()))
    
    # Group by actual and predicted labels and get the counts of each pair
    cols = df.select('label', 'prediction').groupby('label', 'prediction').count()
    
    # Get the counts of each classification type
    tn = cols.filter((col('label') == 0) & (col('prediction') == 0)).collect()[0][2]
    tp = cols.filter((col('label') == 1) & (col('prediction') == 1)).collect()[0][2]
    fn = cols.filter((col('label') == 1) & (col('prediction') == 0)).collect()[0][2]
    fp = cols.filter((col('label') == 0) & (col('prediction') == 1)).collect()[0][2]
    
    # Calculate accuracy, recall, and f1-score
    if tn + tp + fn + fp == 0:
        accuracy, recall, f1, precision = 0, 0, 0, 0
    else:
        accuracy = float(tp + tn)/float(tp + fp + tn + fn)
    
    if (tp + fn) > 0:
        recall = (tp)/float(tp+fn)
    elif (tp + fn) == 0:
        recall = 0
    if (tp + fp + fn) > 0:
        f1_score = (tp)/float(tp + (0.5 * (fp + fn)))
    elif (tp + fp + fn) == 0:
        f1_score == 0
    if (tp + fp) > 0: 
        precision = float(tp)/float(tp + fp)
    elif (tp + fp) == 0:
        precision = 0
    
    return f1_score, recall, accuracy, precision

In [0]:
class RecallEvaluator(Evaluator):
    """A custom class to use recall as the evaluation function"""
    
    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        """
        Finds the recall score on the dataset
        """
        # Count the true positives, true negatives, false positives, and false negatives
        tp = dataset.filter((F.col(self.labelCol).cast(IntegerType()) == 1) & (F.col(self.predictionCol).cast(IntegerType()) == 1)).count()
        fp = dataset.filter((F.col(self.labelCol).cast(IntegerType()) == 0) & (F.col(self.predictionCol).cast(IntegerType()) == 1)).count()
        fn = dataset.filter((F.col(self.labelCol).cast(IntegerType()) == 1) & (F.col(self.predictionCol).cast(IntegerType()) == 0)).count()
        # Get the recall score
        if (tp + fn) > 0:
            recall = float(tp)/float(tp+fn)
        else: recall = 0
        return recall

    def isLargerBetter(self):
        return True

In [0]:
def check_outcome(dep_del15, cancelled, diverted):
    """Bin cancelled, diverted, and delayed flights with a delay of 15+ minutes"""
    x = 0
    if dep_del15 == 1 or cancelled == 1 or diverted == 1:
        x = 1
    return x