## End to End Pipeline for Machine Learning

ML pipeline created to predict flight delays 2 hours prior to departure using Gradient Boosting algorithm. The pipeline will transform the data and output a prediction. 

Input:
- Airline passenger flights data path
- Weather data path

Output:
- Confusion matrix for test data using Gradient Boosting Algorithm

### Import packages

In [4]:
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext
import pyspark.ml.feature as ftr
import pyspark.ml as ml
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import GBTClassifier
import numpy as np
import pandas as pd
import sklearn.metrics as metrics
import matplotlib.pyplot as plt
sqlContext = SQLContext(sc)

### Airport Data

In [6]:
def airport_extract(airport_path):
  
  """
  Extract airport data from the specified path
  
  Parameters:
  airport_path: path to the airport data file
  
  Returns:
  df: airport dataframe  
  """

  file_location = airport_path
  file_type = "csv"

  infer_schema = "true"
  first_row_is_header = "true"
  delimiter = ","
  
  df = None
  df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", first_row_is_header).option("sep", delimiter).load(file_location)
  return df

##### Transforming airport Data

Filter down to US airports only

In [9]:
def airport_transform(dataframe):
     
  """
  Filters airport data down to US airport only
  Remove duplicates (original dataset uses polygon to represent some airports; one point each airport is sufficient for the model)
  
  Parameters:
  dataframe: airport data
  
  Returns:
  dataframe: airport data   
  """

  return (
    dataframe
    .filter("AIRPORT_COUNTRY_CODE_ISO = 'US'")
    .dropDuplicates(['AIRPORT'])
    )

Keep airports that co-exist in the airline dataset

In [11]:
def airport_selection(airport_df, airlines_df):
  
  """
  Filters airport data down to airport that coexists in airlines dataset
    
  Parameters:
  airport_df: airport data
  airlines_df: airlines data
  
  Returns:
  df: airport data   
  """

  airlines_df.createOrReplaceTempView('airlines')
  airport_df.createOrReplaceTempView('airport')

  df = None
  df = spark.sql(
    """
    SELECT * 
    FROM
      airport 
    WHERE 
      AIRPORT IN (
        SELECT DISTINCT ORIGIN FROM airlines
        UNION
        SELECT DISTINCT DEST FROM airlines
        )
    """
  )
  
  return df

##### Transforming Airline Data

In [13]:
def airlines_transform(dataframe):
  
  """
  Airlines transformation function
    - Drop cancelled and diverted flights
    - cast FL_DATE to date type
    - cast OP_CARRIER_FL_NUM to string type
    - create new column DEP_TIME_HOUR from DEP_TIME_BLK
    - create new column ARR_TIME_HOUR from ARR_TIME_BLK
    - extract scheduled departure hour CRS_DEP_TIME_HOUR from CRS_DEP_TIME
    - extract scheduled arrival hour CRS_ARR_TIME_HOUR from CRS_ARR_TIME
    - cast DISTANCE_GROUP to string type
    - Create new column by concatenating OP_CARRIER and OP_CARRIER_FL_NUMBER
    - cast DEP_DEL15 to string type
    - creates a new timestamp column and a two hour prior timestamp column
    - creates a new prior flight delay column
    
  Parameters:
  dataframe: airlines data
  
  Returns:
  dataframe: airlines data   
  """
  
  # Selected Columns
  selected_col = [
  "YEAR",
  "QUARTER",
  "MONTH",
  "DAY_OF_MONTH",
  "DAY_OF_WEEK",
  "FL_DATE",
  "OP_UNIQUE_CARRIER",
  "TAIL_NUM",
  "OP_CARRIER_FL_NUM",
  "ORIGIN",
  "ORIGIN_CITY_NAME",
  "ORIGIN_STATE_ABR",
  "DEST",
  "DEST_CITY_NAME",
  "DEST_STATE_ABR",
  "CRS_DEP_TIME",
  "CRS_DEP_TIME_HOUR",
  'CRS_ARR_TIME_HOUR',
  "DEP_TIME_HOUR",
  "DEP_DELAY_NEW",
  "DISTANCE",
  "DISTANCE_GROUP",
  "DEP_DEL15",
  "ORIGIN_AIRPORT_ID",
  "DEST_AIRPORT_ID",
  "CRS_DEP_TIMESTAMP",
  "CRS_ARR_TIMESTAMP",
  "PR_ARR_DEL15"]
  
  # Creating a window partition to extract prior arrival delay for each flight
  windowSpec = Window.partitionBy("TAIL_NUM").orderBy("CRS_DEP_TIMESTAMP")
  
  # Returning the transformed dataset
  return (
    dataframe
    .filter("CANCELLED != 1 AND DIVERTED != 1")
    .withColumn("FL_DATE", f.col("FL_DATE").cast("date"))
    .withColumn("OP_CARRIER_FL_NUM", f.col("OP_CARRIER_FL_NUM").cast("string"))
    .withColumn("DEP_TIME_HOUR", dataframe.DEP_TIME_BLK.substr(1, 2).cast("int"))
    .withColumn("ARR_TIME_HOUR", dataframe.ARR_TIME_BLK.substr(1, 2).cast("int"))
    .withColumn("CRS_DEP_TIME_HOUR", f.round((f.col("CRS_DEP_TIME")/100)).cast("int"))
    .withColumn("CRS_ARR_TIME_HOUR", f.round((f.col("CRS_ARR_TIME")/100)).cast("int"))
    .withColumn("DISTANCE_GROUP", f.col("DISTANCE_GROUP").cast("string"))
    .withColumn("OP_CARRIER_FL_NUM", f.concat(f.col("OP_CARRIER"),f.lit("_"),f.col("OP_CARRIER_FL_NUM")))
    .withColumn("DEP_DEL15", f.col("DEP_DEL15").cast("string"))
    .withColumn("ARR_DEL15", f.col("ARR_DEL15").cast("string"))
    .withColumn("FL_DATE_string", f.col("FL_DATE").cast("string"))
    .withColumn("YEAR", f.col("YEAR").cast("string"))
    .withColumn("QUARTER", f.col("QUARTER").cast("string"))
    .withColumn("MONTH", f.col("MONTH").cast("string"))
    .withColumn("DAY_OF_MONTH", f.col("DAY_OF_MONTH").cast("string"))
    .withColumn("DAY_OF_WEEK", f.col("DAY_OF_WEEK").cast("string"))
    .withColumn("CRS_DEP_TIME_string", f.col("CRS_DEP_TIME").cast("string"))
    .withColumn("CRS_ARR_TIME_string", f.col("CRS_ARR_TIME").cast("string"))
    .withColumn("CRS_DEP_TIME_HOUR_string", f.col("CRS_DEP_TIME_HOUR").cast("string"))
    .withColumn("CRS_ARR_TIME_HOUR_string", f.col("CRS_ARR_TIME_HOUR").cast("string"))
    .withColumn("CRS_DEP_TIME_HH", f.lpad("CRS_DEP_TIME_string", 4, '0').substr(1,2))
    .withColumn("CRS_DEP_TIME_MM", f.lpad("CRS_DEP_TIME_string", 4, '0').substr(3,2))
    .withColumn("CRS_ARR_TIME_HH", f.lpad("CRS_ARR_TIME_string", 4, '0').substr(1,2))
    .withColumn("CRS_ARR_TIME_MM", f.lpad("CRS_ARR_TIME_string", 4, '0').substr(3,2))
    # Timestamp created to calculate prior arrival delay of a flight and to join with weather dataset
    .withColumn("CRS_DEP_TIMESTAMP", f.concat(f.col("FL_DATE_string"),f.lit(" "),f.col("CRS_DEP_TIME_HH"), f.lit(":"),f.col("CRS_DEP_TIME_MM")).cast("timestamp"))
    .withColumn("CRS_ARR_TIMESTAMP", f.concat(f.col("FL_DATE_string"),f.lit(" "),f.col("CRS_ARR_TIME_HH"), f.lit(":"),f.col("CRS_ARR_TIME_MM")).cast("timestamp"))
    .withColumn("CRS_ELAPSED_TIME", f.round((f.col("CRS_ELAPSED_TIME")/60)).cast("int"))
    # Variable created to incorportate effects of a prior arrival delay into the current flight's departure delay
    .withColumn("PR_ARR_DEL15", f.lag(f.col("ARR_DEL15"), 1).over(windowSpec).cast("string"))
    .select(selected_col)
    )

In [14]:
def weather_extraction(weather_path):
  
  """
  Extract weather data from the specified path
  
  Parameters:
  weather_path: path to the weather data file
  
  Returns:
  df: weather dataframe
  """
    
  weather_df = spark.read.option("header", "true").parquet(weather_path)
  return weather_df

##### Transforming weather data - part 1
- Filter weather dataset to US and Report Type "FM-15"

In [16]:
def weather_transformation_reduction(dataframe, shorlisted_weather_cols):
  
  """
  Filters weather data:
    - US geographical location only
    - weather records for FM15 report type
    - remove columns not shortlisted for model input
  
  Parameters:
  dataframe: weather data
  shorlisted_weather_cols: columns shortlisted for model inputs or for feature engineering
  
  Returns:
  dataframe: weather data   
  """
  
  return (
    dataframe
      .withColumn("COUNTRY", f.substring(f.col("NAME"), -2, 2))
      .filter("COUNTRY = 'US'")
      .filter("REPORT_TYPE LIKE '%FM-15%'")
      .select(shorlisted_weather_cols)
  )

##### Transforming weather data - part 2
- Keep station records that co-exist in the airport dataset

In [18]:
def weather_coordinates_extract(weather_df):
  
  """
  Filters weather data by weather stations that coexist in the airport data
  
  Parameters:
  weather_df: weather data
  
  Returns:
  weather_coordinates: weather station coordinates 
  """
  
  weather_df.createOrReplaceTempView('weather')
  
  weather_coordinates = spark.sql(
  """
  SELECT 
    DISTINCT STATION, CALL_SIGN, LATITUDE, LONGITUDE
  FROM 
    weather
  """
  )
  
  return weather_coordinates

In [19]:
def spatial_join(weather_coordinates_df, airport_df, weather_df):
  
  """
  spatial join to obtain weather station based on euclidean distance
  
  Parameters:
  weather_coordinates_df: weather station location coordinates
  airport_df: airport data
  weather_df: weather data
  
  Returns:
  airport_df: airport data
  weather_df: weather data
  """
  
  weather_coordinates_pdf = weather_coordinates_df.toPandas()
  airport_pdf = airport_df.toPandas()

  X_coordinates = airport_pdf[['LATITUDE', 'LONGITUDE']]
  Y_coordinates = weather_coordinates_pdf[['LATITUDE', 'LONGITUDE']]

  weather_station_idx = metrics.pairwise_distances_argmin_min(X_coordinates, Y_coordinates, metric='euclidean')[0]

  station_id = [weather_coordinates_pdf.iloc[i]['STATION'] for i in weather_station_idx]
  station_id_weather_filter = spark.createDataFrame(station_id,StringType())
  station_id_weather_filter.createOrReplaceTempView('station_id_weather_filter')
  
  airport_pdf['weather_station_id'] = station_id
  airport_df = spark.createDataFrame(airport_pdf)

  weather_df = weather_df.where(f.col("STATION").isin(set(station_id)))
  return (weather_df, airport_df)

##### Transforming weather data - part 3
- Weather feature extraction and tarnsformation

1. First of all, by looking into each weather feature, we extract relevant features that would affect to the airline delay.
2. In the weather dataset, there are missing values that can't count as Null. In order to handle those values for imputation later, we filled those missing values with Null.
3. In weather data section, there are substrings delimited by ",", so we parsed out those substrings into new columns.
4. We assigned erroneous data which codes are 3 and 7 to "999" which indicates missing values.
5. We changed all missing values with Null which will be handled with imputation later.
6. Finally we dropped unnecessary columns for faster running.

In [21]:
def weather_transformation(dataframe):
  
  """
  Weather feature extraction and transformation
  
  Parameters:
  dataframe: weather data
  
  Returns:
  dataframe: weather data
  """
  
  return (
    dataframe
      # Mandatory data section - WND - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("WND_temp", f.substring_index("WND", ",", -2))\
      .withColumn("WND_SPEED", f.substring_index("WND_temp", ",", 1))\
      .withColumn("WND_SPEED_QUALITY", f.substring_index("WND_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("WND_SPEED_QUALITY", f.when((f.col("WND_SPEED_QUALITY") == "3") | (f.col("WND_SPEED_QUALITY") == "7") , "999").otherwise(f.col("WND_SPEED_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("WND_SPEED", f.when((f.col("WND_SPEED") == "") | (f.col("WND_SPEED") == "9999") | (f.col("WND_SPEED_QUALITY") == "999"), None).otherwise(f.col("WND_SPEED")))\
      # Drop unnecessary columns
      .drop("WND_temp","WND", "WND_SPEED_QUALITY")\
      # Mandatory data section - CIG - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("CIG_CAVOC", f.substring_index("CIG", ",", -1))\
      # Change the missing values to Null
      .withColumn("CIG_CAVOC", f.when((f.col("CIG_CAVOC") == "") | (f.col("CIG_CAVOC") == "9"), None).otherwise(f.col("CIG_CAVOC")))\
      # Drop unnecessary columns
      .drop("CIG")\
      # Mandatory data section - VIS - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("VIS_temp", f.substring_index("VIS", ",", 2))\
      .withColumn("VIS_DISTANCE", f.substring_index("VIS_temp", ",", 1))\
      .withColumn("VIS_DISTANCE_QUALITY", f.substring_index("VIS_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("VIS_DISTANCE_QUALITY", f.when((f.col("VIS_DISTANCE_QUALITY") == "3") | (f.col("VIS_DISTANCE_QUALITY") == "7"), "999").otherwise(f.col("VIS_DISTANCE_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("VIS_DISTANCE", f.when((f.col("VIS_DISTANCE") == "") | (f.col("VIS_DISTANCE") == "999999") | (f.col("VIS_DISTANCE_QUALITY") == "999"), None).otherwise(f.col("VIS_DISTANCE")))\
      # Drop unnecessary columns
      .drop("VIS_temp", "VIS_DISTANCE_QUALITY", "VIS")\
      # Mandatory data section - TMP - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("TMP_TEMP", f.substring_index("TMP", ",", 1))\
      .withColumn("TMP_TEMP_QUALITY", f.substring_index("TMP", ",", -1))\
      # Filter out erroneous data
      .withColumn("TMP_TEMP_QUALITY", f.when((f.col("TMP_TEMP_QUALITY") == "3") | (f.col("TMP_TEMP_QUALITY") == "7"), "999").otherwise(f.col("TMP_TEMP_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("TMP_TEMP", f.when((f.col("TMP_TEMP") == "") | (f.col("TMP_TEMP") == "+9999") | (f.col("TMP_TEMP_QUALITY") == "999"), None).otherwise(f.col("TMP_TEMP")))\
      # Drop unnecessary columns
      .drop("TMP_TEMP_QUALITY", "TMP")\
      # Mandatory data section - DEW - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("DEW_TEMP", f.substring_index("DEW", ",", 1))\
      .withColumn("DEW_TEMP_QUALITY", f.substring_index("DEW", ",", -1))\
      # Filter out erroneous data
      .withColumn("DEW_TEMP_QUALITY", f.when((f.col("DEW_TEMP_QUALITY") == "3") | (f.col("DEW_TEMP_QUALITY") == "7"), "999").otherwise(f.col("DEW_TEMP_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("DEW_TEMP", f.when((f.col("DEW_TEMP") == "") | (f.col("DEW_TEMP") == "+9999") | (f.col("DEW_TEMP_QUALITY") == "999"), None).otherwise(f.col("DEW_TEMP")))\
      # Drop unnecessary columns
      .drop("DEW_TEMP_QUALITY", "DEW")\
      # Mandatory data section - SLP - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("SLP_PRESSURE", f.substring_index("SLP", ",", 1))\
      .withColumn("SLP_PRESSURE_QUALITY", f.substring_index("SLP", ",", -1))\
      # Filter out erroneous data
      .withColumn("SLP_PRESSURE_QUALITY", f.when((f.col("SLP_PRESSURE_QUALITY") == "3") | (f.col("SLP_PRESSURE_QUALITY") == "7"), "999").otherwise(f.col("SLP_PRESSURE_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("SLP_PRESSURE", f.when((f.col("SLP_PRESSURE") == "") | (f.col("SLP_PRESSURE") == "99999") | (f.col("SLP_PRESSURE_QUALITY") == "999"), None).otherwise(f.col("SLP_PRESSURE")))\
      # Drop unnecessary columns
      .drop("SLP_PRESSURE_QUALITY", "SLP" )\
      # Additional data section - AA1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("AA1_temp", f.substring_index("AA1", ",", -3))\
      .withColumn("PRECIPITATION", f.substring_index("AA1_temp", ",", 1))\
      .withColumn("PRECIPITATION_QUALITY", f.substring_index("AA1_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("PRECIPITATION_QUALITY", f.when((f.col("PRECIPITATION_QUALITY") == "3") | (f.col("PRECIPITATION_QUALITY") == "7"), "999").otherwise(f.col("PRECIPITATION_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("PRECIPITATION", f.when((f.col("PRECIPITATION") == "") | (f.col("PRECIPITATION") == "9999") | (f.col("PRECIPITATION_QUALITY") == "999"), None).otherwise(f.col("PRECIPITATION")))\
      # Drop unnecessary columns
      .drop("AA1_temp", "AA1", "PRECIPITATION_QUALITY")\
      # Additional data section - AJ1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("AJ1_temp", f.substring_index("AJ1", ",", 3))\
      .withColumn("SNOW", f.substring_index("AJ1_temp", ",", 1))\
      .withColumn("SNOW_QUALITY", f.substring_index("AJ1_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("SNOW_QUALITY", f.when((f.col("SNOW_QUALITY") == "3") | (f.col("SNOW_QUALITY") == "7"), "999").otherwise(f.col("SNOW_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("SNOW", f.when((f.col("SNOW") == "") | (f.col("SNOW") == "9999") | (f.col("SNOW_QUALITY") == "999"), None).otherwise(f.col("SNOW")))\
      # Drop unnecessary columns
      .drop("AJ1_temp", "AJ1", "SNOW_QUALITY")\
      # Additional data section - AT1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("AT1_temp", f.substring_index("AT1", ",", -3))\
      .withColumn("WEATHER_OBSERVATION", f.substring_index("AT1_temp", ",", 1))\
      .withColumn("WEATHER_OBSERVATION_QUALITY", f.substring_index("AT1_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("WEATHER_OBSERVATION_QUALITY", f.when((f.col("WEATHER_OBSERVATION_QUALITY") == "3") | (f.col("WEATHER_OBSERVATION_QUALITY") == "7"), "999").otherwise(f.col("WEATHER_OBSERVATION_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("WEATHER_OBSERVATION", f.when((f.col("WEATHER_OBSERVATION") == "") | (f.col("WEATHER_OBSERVATION_QUALITY") == "999"), None).otherwise(f.col("WEATHER_OBSERVATION")))\
      # Drop unnecessary columns
      .drop("AT1", "AT1_temp", "WEATHER_OBSERVATION_QUALITY")\
      # Additional data section - GA1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("GA1_temp", f.substring_index("GA1", ",", 4))\
      .withColumn("GA1_temp2", f.substring_index("GA1_temp", ",", 2))\
      .withColumn("GA1_temp3", f.substring_index("GA1_temp", ",", -2))\
      .withColumn("CLOUD_COVERAGE", f.substring_index("GA1_temp2", ",", 1))\
      .withColumn("CLOUD_COVERAGE_QUALITY", f.substring_index("GA1_temp2", ",", -1))\
      # Filter out erroneous data
      .withColumn("CLOUD_COVERAGE_QUALITY", f.when((f.col("CLOUD_COVERAGE_QUALITY") == "3") | (f.col("CLOUD_COVERAGE_QUALITY") == "7"), "999").otherwise(f.col("CLOUD_COVERAGE_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("CLOUD_COVERAGE", f.when((f.col("CLOUD_COVERAGE") == "") | (f.col("CLOUD_COVERAGE") == "99") | (f.col("CLOUD_COVERAGE") == "9") | (f.col("CLOUD_COVERAGE") == "10") | (f.col("CLOUD_COVERAGE_QUALITY") == "999"), None).otherwise(f.col("CLOUD_COVERAGE")))\
      # Drop unnecessary columns
      .drop("GA1", "GA1_temp", "GA1_temp2", "CLOUD_COVERAGE_QUALITY")\
      # Additional data section - GA1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("CLOUD_BASE_HEIGHT", f.substring_index("GA1_temp3", ",", 1))\
      .withColumn("CLOUD_BASE_HEIGHT_QUALITY", f.substring_index("GA1_temp3", ",", -1))\
      # Filter out erroneous data
      .withColumn("CLOUD_BASE_HEIGHT_QUALITY", f.when((f.col("CLOUD_BASE_HEIGHT_QUALITY") == "3") | (f.col("CLOUD_BASE_HEIGHT_QUALITY") == "7"), "999").otherwise(f.col("CLOUD_BASE_HEIGHT_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("CLOUD_BASE_HEIGHT", f.when((f.col("CLOUD_BASE_HEIGHT") == "") | (f.col("CLOUD_BASE_HEIGHT") == "+99999") | (f.col("CLOUD_BASE_HEIGHT_QUALITY") == "999"), None).otherwise(f.col("CLOUD_BASE_HEIGHT")))\
      # Drop unnecessary columns
      .drop("GA1_temp3", "CLOUD_BASE_HEIGHT_QUALITY")\
      # Additional data section - IA1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("GROUND_SURFACE", f.substring_index("IA1", ",", 1))\
      .withColumn("GROUND_SURFACE_QUALITY", f.substring_index("IA1", ",", -1))\
      # Filter out erroneous data
      .withColumn("GROUND_SURFACE_QUALITY", f.when(f.col("GROUND_SURFACE_QUALITY") == "3", "999").otherwise(f.col("GROUND_SURFACE_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("GROUND_SURFACE", f.when((f.col("GROUND_SURFACE") == "") | (f.col("GROUND_SURFACE") == "99") | (f.col("GROUND_SURFACE_QUALITY") == "999"), None).otherwise(f.col("GROUND_SURFACE")))\
      # Drop unnecessary columns
      .drop("IA1", "GROUND_SURFACE_QUALITY" )\
      # Additional data section - MA1 - Create another substring column from the column that has multiple values delimited by ","
      .withColumn("MA1_temp", f.substring_index("MA1", ",", 2))\
      .withColumn("ALTIMETER_SET", f.substring_index("MA1_temp", ",", 1))\
      .withColumn("ALTIMETER_SET_QUALITY", f.substring_index("MA1_temp", ",", -1))\
      # Filter out erroneous data
      .withColumn("ALTIMETER_SET_QUALITY", f.when((f.col("ALTIMETER_SET_QUALITY") == "3") | (f.col("ALTIMETER_SET_QUALITY") == "7"), "999").otherwise(f.col("ALTIMETER_SET_QUALITY")))\
      # Fill/Change the missing values to Null
      .withColumn("ALTIMETER_SET", f.when((f.col("ALTIMETER_SET") == "") | (f.col("ALTIMETER_SET") == "99999") | (f.col("ALTIMETER_SET_QUALITY") == "999"), None).otherwise(f.col("ALTIMETER_SET")))\
      # Drop unnecessary columns
      .drop("MA1", "MA1_temp", "ALTIMETER_SET_QUALITY")
  )

##### Transforming weather data - part 4
- Weather data UTC time to local time conversion to match with airline dataset which uses local time zone

In [23]:
# Getting time zone for each weather station
def get_timezone(longitude, latitude):
    from timezonefinder import TimezoneFinder
    tzf = TimezoneFinder()
    return tzf.timezone_at(lng=longitude, lat=latitude)

##### Create timestamp

In [25]:
def create_timestamp(airlines_df, weather_df):
  
  """
  Create weather timestamp and two-hour prior timestamps for airlines based on scheduled departure times for joining the two datasets
  
  Parameters:
  airlines_df: airlines data
  weather_df: weather data
  
  Returns:
  airlines_df: airlines data
  weather_df: weather data
  """

  airlines_df = airlines_df.withColumn('TWO_HOUR', airlines_df.CRS_DEP_TIMESTAMP + f.expr('INTERVAL -2 HOURS'))
  
  # Weather - Create timestamp using date/hour/minute part
  udf_timezone = f.udf(get_timezone, StringType()) 
  weather_df = weather_df.withColumn("LOCAL", udf_timezone(weather_df.LONGITUDE, weather_df.LATITUDE))
  weather_df = weather_df.withColumn("LOCAL_DATE", f.from_utc_timestamp(f.col("DATE"), weather_df.LOCAL))
  
  weather_df = weather_df.withColumn("DATE_PART", f.to_date(f.col("LOCAL_DATE")))\
         .withColumn("HOUR_PART", f.hour(f.col("LOCAL_DATE")).cast("int"))\
         .withColumn("MINUTE_PART", f.minute(f.col("LOCAL_DATE")).cast("int"))\
         .withColumn("CALL_SIGN", f.trim(f.col("CALL_SIGN")))\
         .drop("LOCAL", "LOCAL_DATE")
  
  weather_df = weather_df.withColumn("AL_JOIN_DATE", f.col("DATE_PART").cast("string"))\
                         .withColumn("AL_JOIN_HOUR", f.col("HOUR_PART").cast("string"))\
                         .withColumn("AL_JOIN_MINUTE", f.col("MINUTE_PART").cast("string"))\
                         .withColumn("AL_JOIN_TIMESTAMP", f.concat(f.col("AL_JOIN_DATE"),f.lit(" "),f.col("AL_JOIN_HOUR"), f.lit(":"),f.col("AL_JOIN_MINUTE")).cast("timestamp"))
  
  # Creating a helper timestamp column for the airlines to weather join
  # Converting timestamp to unix
  # Reducing ThisTimeStamp be a second to avoid overlapping ranges during join
  windowSpecJoin = Window.partitionBy("STATION").orderBy("AL_JOIN_TIMESTAMP")
  weather_df = weather_df.withColumn("ThisTimeStamp", f.unix_timestamp(f.col("AL_JOIN_TIMESTAMP"))).withColumn("NextTimeStamp", f.lead(f.col("ThisTimeStamp"), 1).over(windowSpecJoin) -1)
  
  # Converting airlines timestamp to unix for airlines to weather join.
  airlines_df = airlines_df.withColumn("AL_ThisTimeStamp", f.unix_timestamp(f.col("TWO_HOUR")))
  return (airlines_df, weather_df)

##### Airport to airline join

In [27]:
def airport_airline_join(airlines_df, airport_df):
  
  """
  Left join of airport data with airline dataset
  
  Parameters:
  airlines_df: airlines data
  airport_df: airport data
  
  Returns:
  airlines_df: airlines data
  """
  
  airlines_df.createOrReplaceTempView('airlines')
  airport_df.createOrReplaceTempView('airport')

  airlines_df = spark.sql(
  """
  SELECT 
    airline.*,
    airport_origin.LATITUDE AS ORIGIN_LATITUDE,
    airport_origin.LONGITUDE AS ORIGIN_LONGITUDE,
    airport_origin.weather_station_id AS weather_station,
    airport_destination.LATITUDE AS DESTINATION_LATITUDE,
    airport_destination.LONGITUDE AS DESTINATION_LONGITUDE
  FROM
    airlines airline
    LEFT JOIN airport airport_origin
      ON airline.ORIGIN = airport_origin.AIRPORT
    LEFT JOIN airport airport_destination
      ON airline.DEST = airport_destination.AIRPORT
  """
  )
  return airlines_df

In [28]:
def airlines_weather_leftjoin(airlines_df, weather_df):
  
  """
  Left join of weather data with airline dataset
  
  Parameters:
  airlines_df: airlines data
  weather_df: weather data
  
  Returns:
  airlines_df: airlines and weather joined data
  """

  airlines_df.repartition(363, "weather_station").createOrReplaceTempView('airlines')
  weather_df.repartition(363, "STATION").createOrReplaceTempView('weather')


  airlines_weather = spark.sql("""
  SELECT a.*, w.*
  FROM
    airlines a
  LEFT JOIN weather w
  ON
    a.weather_station = w.STATION
    AND a.AL_ThisTimeStamp BETWEEN w.ThisTimeStamp AND w.NextTimeStamp
    """
  )
  
  return airlines_weather

##### Create data model

In [30]:
def create_data_model(dataframe):
  
  """
  Preparing the data for inout into the model for training and testing
    - Keeping only the required features
    - Ensuring the dataType is accurate
    - imputing values based on the feature distribution
  
  Parameters:
  dataframe: airlines and weather joined data
  
  Returns:
  dataframe: dataset for model training and testing
  """
  
  selected_columns = ['YEAR',
   'DAY_OF_WEEK',
   'ORIGIN',
   'DEST',
   'CRS_ARR_TIME_HOUR',
   'CRS_DEP_TIME_HOUR',
   'OP_UNIQUE_CARRIER',
   'MONTH',
   'DAY_OF_MONTH',
   'QUARTER',
   'PR_ARR_DEL15',
   'CLOUD_BASE_HEIGHT',
   'ALTIMETER_SET',
   'PRECIPITATION',
   'SNOW',
   'SLP_PRESSURE',
   'TMP_TEMP',
   'DEW_TEMP',
   'DISTANCE',
   'CLOUD_COVERAGE',
   'VIS_DISTANCE',
   'WND_SPEED',
   'DEP_DEL15']
  
  selected_columns = dataframe.select(selected_columns)
  
  categoricals = [
   'YEAR',
   'DAY_OF_WEEK',
   'ORIGIN',
   'DEST',
   'CRS_ARR_TIME_HOUR',
   'CRS_DEP_TIME_HOUR',
   'OP_UNIQUE_CARRIER',
   'MONTH',
   'DAY_OF_MONTH',
   'QUARTER',
   'PR_ARR_DEL15',
   'DEP_DEL15']

  numerics = list(set(dataframe.columns).difference(categoricals))
  
  # Casting the model
  dataframe = dataframe.select([f.col(feature).cast("int") for feature in numerics] + [f.col(feature).cast("string") for feature in categoricals])
  
  # Imputation
  # Creating seperate lists that include column names to calculate mean and median
  cols_mean = ["ALTIMETER_SET", "DEW_TEMP", "SLP_PRESSURE", "TMP_TEMP"]
  cols_median = ["CLOUD_BASE_HEIGHT", "CLOUD_COVERAGE", "PRECIPITATION", "SNOW", "VIS_DISTANCE", "WND_SPEED"]

  # replacing null values with mean value
  for c in cols_mean:
    c_mean = dataframe.agg({c: 'mean'}).collect()[0][0]
    dataframe = dataframe.na.fill({c: c_mean})

  # replacing null values with median value
  for c in cols_median:
    c_median = dataframe.approxQuantile(c, [0.5],0.1)[0]
    dataframe = dataframe.na.fill({c: c_median})
  
  return dataframe

##### Drop Outcome Variables with NULL Values

As we only have approximately 3700 records with null labels, we decided to drop those records at this stage

In [32]:
def drop_null_labels(dataframe):
  
  """
  As we only have approximately 3700 records with null labels, we decided to drop those records at this stage
  
  Parameters:
  dataframe: dataset for model training and testing
  
  Returns:
  dataframe: dataset for model training and testing
  """
  
  dataframe.createOrReplaceTempView('data_model')
  
  dataframe = spark.sql(
  """
  SELECT
    *
  FROM
    data_model
  WHERE
    DEP_DEL15 IS NOT NULL
  """
  )
  
  return dataframe

##### Split dataset

In [34]:
def split_dataset(dataframe):
  
  """
  Splitting the data into train and test datasets
   - Year 2015-2018 Train Data
   - Year 2019 Test Data
  
  Parameters:
  dataframe: dataset for model training and testing
  
  Returns:
  trainRDD: Train Data
  testRDD: Test Data
  """
  
  testRDD = dataframe.filter("YEAR = '2019'")
  trainRDD = dataframe.filter("YEAR != '2019'")
  return trainRDD, testRDD

##### Gradient Boosted Trees

In [36]:
def gradient_boosted_trees(trainRDD, testRDD):
  
  """
  Training and Testing data on Gradient Boosted Algorithm
  
  Parameters:
  trainRDD: Train Data
  testRDD: Test Data
  
  Returns:
  prediction
  """  
  
  # obtain numeric features 
  numerics_gbt = [feature for (feature, dataType) in trainRDD.dtypes if ((dataType == "double") | (dataType == "int")) & (feature != "DEP_DEL15")]

  # obtain categorical features 
  categoricals_gbt = [feature for (feature, dataType) in trainRDD.dtypes if (dataType == "string") & (feature != "DEP_DEL15")]
  
  # Establish stages for our GBT model
  inputCol_gbt = [x + "Index" for x in categoricals_gbt]
  indexers_gbt = StringIndexer(inputCols=categoricals_gbt, outputCols=inputCol_gbt, handleInvalid="keep")
  label_indexers_gbt = StringIndexer(inputCol="DEP_DEL15", outputCol="label")

  featureCols_gbt = inputCol_gbt + numerics_gbt

  # Define vector assemblers
  vector_gbt = VectorAssembler(inputCols=featureCols_gbt, outputCol="features")

  # Define a GBT model.
  gbt = GBTClassifier(featuresCol="features",
                      labelCol="label",
                      lossType = "logistic",
                      maxIter = 50,
                      maxDepth = 10,
                      maxBins = 400)

  # Chain indexer and GBT in a Pipeline
  stages_gbt = [indexers_gbt, label_indexers_gbt, vector_gbt, gbt]
  pipeline_gbt = Pipeline(stages=stages_gbt)

  # Train the tuned model and establish our best model
  cv_gbt_model = pipeline_gbt.fit(trainRDD)

  test_gbt = cv_gbt_model.transform(testRDD)

  evaluator = BinaryClassificationEvaluator()

  tp = test_gbt[(test_gbt.DEP_DEL15 == 1) & (test_gbt.prediction == 1)].count()
  tn = test_gbt[(test_gbt.DEP_DEL15 == 0) & (test_gbt.prediction == 0)].count()
  fp = test_gbt[(test_gbt.DEP_DEL15 == 0) & (test_gbt.prediction == 1)].count()
  fn = test_gbt[(test_gbt.DEP_DEL15 == 1) & (test_gbt.prediction == 0)].count()

  print("########### Gradient Boosted Trees ###########\n")
  data = {"Actual: delay": [tp, fn], "Actual: on-time": [fp, tn]}
  confusion_matrix = pd.DataFrame.from_dict(data, orient="index", columns=["Prediction: delay", "Prediction: on-time"])

  print("Test Area Under ROC: ", "{:.2f}".format(evaluator.evaluate(test_gbt, {evaluator.metricName: "areaUnderROC"})))
  print("Test Area Under Precision-Recall Curve: ", "{:.2f}".format(evaluator.evaluate(test_gbt, {evaluator.metricName: "areaUnderPR"})))

  print("True positive rate: {:.2%}".format(tp/(tp+fn)))
  print("True negative rate: {:.2%}".format(tn/(tn+fp)))
  print("False positive rate: {:.2%}".format(fp/(tn+fp)))
  print("False negative rate: {:.2%}".format(fn/(tp+fn)))
  
  print("---------------------------------------\n")
  precision = tp/(tp + fp)
  print("Precision: {:.2%}".format(precision))
  recall = tp/(tp + fn)
  print("Recall: {:.2%}".format(recall))

  f1_score = (2 * precision * recall)/(precision + recall)
  print("F1 Score: {:.2%}".format(f1_score))

  
  print("########### Confusion Matrix ###########\n")
  print(confusion_matrix)
  
  prediction = test_gbt.select("prediction")
  return prediction

## End to End Pipeline

In [38]:
# End to End Pipeline:

def end_to_end_pipeline(airlines_path, weather_path, airport_path):
  
  """
  Calling the end to end pipeline which transforms data, prints evaluation metrics and returns prediction
  
  Parameters:
  airlines_path: path to airlines dataset
  weather_path: path to weather dataset
  airport_path: path to airport dataset
  
  Returns: prediction
  """
  
  # selected features for weather
  shorlisted_weather_cols = ["STATION", "DATE", "LATITUDE", 'LONGITUDE', 'NAME', 'REPORT_TYPE', 'CALL_SIGN', 'WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP', 'AA1', 'AJ1', 'AT1', 'GA1', 'IA1', 'MA1']
  
  # initiate airlines, airport, and weather dataframe
  airlines_df = spark.read.option("header", "true").parquet(airlines_path)
  airport_df = airport_extract(airport_path)
  weather_df = weather_extraction(weather_path)
  
  # airport transformation
  airport_df = airport_transform(airport_df)
  airport_df = airport_selection(airport_df, airlines_df)
  
  # airlines transformation 
  airlines_df = airlines_transform(airlines_df)
  
  # weather transformation 1
  weather_df = weather_transformation_reduction(weather_df, shorlisted_weather_cols)
  
  # obtain weather coordinates
  weather_coordinates = weather_coordinates_extract(weather_df)
  
  # weather station spatial join
  weather_df, airport_df = spatial_join(weather_coordinates, airport_df, weather_df)
  
  # weather transformation 2
  weather_df = weather_transformation(weather_df)
  
  # weather and airlines transformation 3
  airlines_df, weather_df = create_timestamp(airlines_df, weather_df)
  
  # airport to airline join
  airlines_df = airport_airline_join(airlines_df, airport_df)

  # weather to airlines left join
  airlines_df = airlines_weather_leftjoin(airlines_df, weather_df)
  
  # Create data model & drop NULL values
  data_model = create_data_model(airlines_df)
  data_model = drop_null_labels(data_model)
  
  # train, test, validation data split
  trainRDD, testRDD = split_dataset(data_model)
  
  # ML Pipeline
  trainRDD.cache()
  testRDD.cache()
  prediction = gradient_boosted_trees(trainRDD, testRDD)
  
  return prediction

In [39]:
# Calling the end to end pipeline
airport_path = "/FileStore/tables/193498910_T_MASTER_CORD.csv"
airlines_path = "dbfs:/mnt/mids-w261/data/datasets_final_project/parquet_airlines_data/*.parquet"
weather_path = "dbfs:/mnt/mids-w261/data/datasets_final_project/weather_data/*.parquet"

end_to_end_pipeline(airlines_path, weather_path, airport_path)