In [0]:
## Phases Run:
#### Setup Environment

In [0]:
import time
import random
import numpy as np
import pandas as pd
import airportsdata
from itertools import chain
from datetime import datetime 
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row, Column
from pyspark.ml.feature import Imputer
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg import SparseVector, DenseVector
from pyspark.sql.types import BooleanType, StringType, IntegerType, DoubleType, LongType
from pyspark.ml.feature import OneHotEncoder, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# custom configuration
sc = spark.sparkContext

In [0]:
# PIPELINE DEVELOPMENT
# If you would like to update any parts of the pipeline individually, using the get_ commands with write = True is the recommended method. 
# Airlines - get_airlines(write=True)
# Stations - get_stations(write=True)
# Individual stations - get_stations_agg(write=True)
# Airline & station join - get_flights(df_airlines_post, df_stations_agg, env='dev', write=True)
# Weather - get_weather(weather_filters, stations_list, flightMonths_list, flightYears_list, env='dev', write=True)
# Anything after airline and station join, usage for updating is can be found in the process_pipeline function

In [0]:
blob_container = "w261" # The name of your container created in https://portal.azure.com
storage_account = "w261finalprojectv2" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261-scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "w261-key-2" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
spark.conf.set(
  f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# Configurations
max_write_rows = 200000

In [0]:
#### Airlines Data

In [0]:
# *Spencer's Notes*
# * Output Columns
#   * DIV *: As best as I can tell, we have a series of equivalent data for diverted flights
#   * DEL *: Really, *DEL* (DEP_DELAY, ARR_DELAY, etc.)
#   * TAXI_OUT: Time elapsed between departed from the airport gate and wheels off the ground
#   * WHEELS_OFF: Take-off time from departure airport
#   * WHEELS_ON: Landing time at arrival airport (Add to `delay_cols`?)
#   * TAXI-IN: Time between landing and arrival at airport gate (Add to `delay_cols`?)
#   * CRS_ARR_TIME: Computer Reservation System 
#     * CRS INFO: https://www.bts.gov/topics/airlines-and-airports/number-14-time-reporting
#     * CRS_ARR_TIME = scheduled arrival time, CRS_ELAPSED_TIME = scheduled elapsed time,etc. 
#     * Some of this info might be useful. For example, flights expected to be shorter or longer, as specified by CRS_ELAPSED_TIME, might be delayed with different probabilities. 
#   * CANCELLATION_CODE: A = Carrier, B = Weather, C= National Air System, D = Security
#     * [Source](https://www.transtats.bts.gov/FieldInfo.asp?Svryq_Qr5p=f2rpvsvr5%FDgur%FDern510%FDS14%FDPn0pryyn6v10&Svryq_gB2r=Pun4&Y11x72_gnoyr=Y_PNaPRYYNgVba&gnoyr_VQ=FGJ&flf_gnoyr_anzr=g_bagVZR_eRcbegVaT&fB5_Svryq_anzr=PNaPRYYNgVba_PbQR)
#     * Knowing if a prior flight from an airport is cancelled might be informative
#   * TOTAL ADD GTIM: Total Ground Time Away from Gate for Gate Return or Cancelled Flight
#   * LONGEST AGG GTIME: Longest Time Away from Gate for Gate Return or Cancelled Flight
# * With a lot of these variables, I believe some clever (yet hefty) feature engineering could create some interesting variables. However, such feature engineering does not obviously (at least to me) pass a cost benefit analysis, so we can evaluate rather we need more informative variables later on. 
# * What percantage of flights are diverted? How do we categorize these?


In [0]:
# airline data dictionary - https://www.transtats.bts.gov/Fields.asp?gnoyr_VQ=FGJ

# OUTPUT COLUMNS - related to delays or come after the fact for a flight
# # 'DIV*', 'DEL*', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_TIME_BLK', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 'TOTAL_ADD_GTIME', 'LONGEST_ADD_GTIME', 'DIV_ACTUAL_ELAPSED_TIME', 'DIV_REACHED_DEST'

# DUPLICATIVE
# OP_CARRIER_AIRLINE_ID - OP_UNIQUE_CARRIER & OP_CARRIER 
# ORIGIN_AIRPORT_ID - ORIGIN,  ORIGIN_CITY_NAME, ORIGIN_CITY_MARKET_ID,  ORIGIN_STATE_ABR, ORIGIN_STATE_NM, ORIGIN_AIRPORT_SEQ_ID
# DEST_AIRPORT_SEQ_ID - DEST, DEST_CITY_NAME, DEST_CITY_MARKET_ID, DEST_STATE_ABR, DEST_STATE_NM, DEST_AIRPORT_ID, 
# 'DEP_TIME' - 'DEP_TIME_BLK', 'CRS_DEP_TIME',
# ORIGIN - kept for primary join and will be dropped after

# OUTPUT 
# 'DEP_DEL15' & 'CANCELLED'

In [0]:
def write_process_airlines(env='dev'):
  """
  This function ingests raw airlines data and writes processed data to storage
  input: env - dev/prod
  output: None
  """
  checkpoint = time.time()
  if env == 'prod':
  # Load 2015 Q1 for Flights
    df_airlines = spark.read.parquet("/mnt/mids-w261/datasets_final_project/parquet_airlines_data/*") \
                            .dropDuplicates() \
                            .cache()
  
  else: 
    df_airlines = spark.read.parquet("/mnt/mids-w261/datasets_final_project/parquet_airlines_data/*") \
                            .filter((col('MONTH') == 1) & (col('ORIGIN').isin(['OAK', 'ATL', 'SJU']) )) \
                            .dropDuplicates() \
                            .cache()
   
#   checkpoint = time.time()
    
#   df_airlines.write.partitionBy('YEAR', 'ORIGIN') \
#                   .option("maxRecordsPerFile", max_write_rows) \
#                   .mode("overwrite") \
#                   .parquet(f"{blob_url}/processed-{env}/df_airlines_deduped")
  
#   df_airlines = spark.read.parquet(f"{blob_url}/processed-{env}/df_airlines_deduped").cache()  
  
#   stage_time = time.time() - checkpoint
#   print(f"{stage_time} sec - airlines data written and re-read")
#   checkpoint = time.time()
  
  
  cancelled_cols = ['CANCELLED', 'CANCELLATION_CODE']

  # ignored as they are known after and may cause data leakage
  delay_cols = ['DEP_DELAY', 'ARR_DELAY', 'ARR_DELAY_NEW', 'ARR_DEL15', 'ARR_DELAY_GROUP', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY', 'FIRST_DEP_TIME', 'DIV_ARR_DELAY', 'DEP_DELAY_GROUP']
  
  airline_cols = ['YEAR', 'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'FL_DATE', 'OP_CARRIER_AIRLINE_ID', 'TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN_AIRPORT_ID', 'ORIGIN_AIRPORT_SEQ_ID', 'ORIGIN', 'ORIGIN_WAC', 'DEST_AIRPORT_ID', 'DEST_AIRPORT_SEQ_ID', 'DEST_STATE_FIPS', 'DEST_WAC', 'DEP_TIME', 'DEP_DEL15', 'DEP_DELAY_NEW', 'FLIGHTS', 'DISTANCE', 'DISTANCE_GROUP', 'DIV_AIRPORT_LANDINGS']
  df_airlines = df_airlines.select(*airline_cols).cache()
  
  df_airlines.count()
  
  df_airlines.write.partitionBy('YEAR') \
                    .option("maxRecordsPerFile", max_write_rows) \
                    .mode("overwrite") \
                    .parquet(f"{blob_url}/processed-{env}/df_airlines_deduped")
  
  df_airlines = spark.read.parquet(f"{blob_url}/processed-{env}/df_airlines_deduped").cache()
  
  # time reported for departure and arrivals are in local time
  # flights - FL_DATE YYYY-MM-DD, DEP_TIME 850, 1505
  # df_airlines.select(col('DEP_TIME').cast('string').alias('DEP_TIME_OF')).show(5)
  df_airlines = df_airlines.withColumn('DEP_TIME', df_airlines['DEP_TIME'])
  timestamp_dep_time = to_timestamp('DEP_TIME')
  df_airlines = df_airlines.withColumn( 'DEP_TIME_OF', timestamp_dep_time)

  # actual conversion of time col to time type
  # df_airlines = df_airlines.withColumn("DEP_HOUR_OF", split(col("DEP_TIME").cast(StringType()), "..$").getItem(0).cast(IntegerType()))
  
  # format and bin time for intervals
  dep_time_hour = split(col("DEP_TIME").cast(StringType()), "..$").getItem(0).cast(IntegerType())

  # convert categoricals - https://spark.apache.org/docs/latest/ml-features.html#stringindexer
  indexer = StringIndexer(inputCol='TAIL_NUM', outputCol="TAIL_NUM_NEW")
  df_airlines_post = indexer.fit(df_airlines).transform(df_airlines)

  # extract hour from departure
  dateformat_dep_time = date_format(col('DEP_TIME_OF'), 'HH:mm')
  df_airlines_post = df_airlines.withColumn('DEP_TIME_OF', dateformat_dep_time)
  df_airlines_post = df_airlines.withColumn('DEP_HOUR_OF', dep_time_hour).drop('DEP_TIME_OF').cache()
  
  # convert local timezones to UTC - weather data is reported in UTC
  airports = airportsdata.load('IATA')
  timezones_dict = {airport_code: airports[airport_code]["tz"] for airport_code in airports}
  mapping_expr = create_map([lit(x) for x in chain(*timezones_dict.items())])
  df_airlines_post = df_airlines_post.withColumn("TIMEZONE", mapping_expr[col("ORIGIN")])

  ## Create a datetime column (to accomodate for day changes)
  df_airlines_post = df_airlines_post.withColumn("DEP_HOUR_OF_FORMATTED", concat_ws(":", col("DEP_HOUR_OF"), lit("00"), lit("00")))
  df_airlines_post = df_airlines_post.withColumn("FL_DATETIME", concat_ws(" ", col("FL_DATE"), col("DEP_HOUR_OF_FORMATTED")))

  ## Create a new UTC version of the hour column
  df_airlines_post = df_airlines_post.withColumn("FL_DATETIME_UTC", to_utc_timestamp(to_timestamp(col("FL_DATETIME"), "yyyy-MM-dd H:mm:ss"), col("TIMEZONE")))

  ## Truncate to just the UTC hour
  df_airlines_post = df_airlines_post.withColumn("DEP_HOUR_OF_UTC", hour(col("FL_DATETIME_UTC")))
  
  stage_time = time.time() - checkpoint
  print(f"{stage_time} sec - airlines data processed")
  checkpoint = time.time()
  df_airlines_post.count()
  
  # write the processed airlines data to disk
  df_airlines_post.write.partitionBy('YEAR', 'ORIGIN') \
                        .option("maxRecordsPerFile", max_write_rows) \
                        .mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_airlines_post")
  
  stage_time = time.time() - checkpoint
  print(f"{stage_time} sec - airlines data written")

In [0]:
def read_airlines(env='dev'):
  """
  This function reads processed airlines data from storage
  input: env - dev/prod
  output: dataframe
  """
  df_airlines_post = spark.read.parquet(f"{blob_url}/processed-{env}/df_airlines_post").cache()
  return df_airlines_post

In [0]:
def get_airlines(env='dev', write=False):
  """
  This function allows data pipeline to be rerun and to overwrite what is in storage
  then reads what is in storage 
  input: env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_process_airlines(env=STAGE)
  else:
    pass
  df_airlines_post = read_airlines(env=STAGE).cache()
  print(f"Your new df_airlines has {df_airlines_post.count():,} rows.")
  return df_airlines_post

In [0]:
def get_airports_list(df_airlines_post, env='dev'):
  """
  This uses the airlines processed data and gets a list of the airports
  input: dataframe
  output: list
  """
  # get list of airports (neighbor_call) and weather stations (neighbor_id) in the data 
  airports_list = df_airlines_post.select("ORIGIN").distinct().rdd.flatMap(lambda x: x).collect()
  # the neighbor_id's in stations all start with k. 
  airports_list = ["K" + x for x in airports_list]
  return airports_list

In [0]:
#### Stations Data

In [0]:
def write_process_stations(airports_list, env='dev'):
  """
  This function ingests raw stations data and writes processed data to storage
  input: env - dev/prod
  output: None
  """
  if env == 'prod':
  # Load 2015 Q1 for Flights
    df_stations = spark.read.parquet("/mnt/mids-w261/datasets_final_project/stations_data/*").filter(col('NEIGHBOR_CALL').isin(airports_list)).cache()
  
  else: 
    df_stations = spark.read.parquet("/mnt/mids-w261/datasets_final_project/stations_data/*").filter(col('NEIGHBOR_CALL').isin(airports_list)).cache()
    
  # neighbor call signs have additional k at start and should be removed. 
  call_sign_expr = "^.{1}"
  regex_replace_call_sign = regexp_replace('neighbor_call', call_sign_expr, '')

  # drop duplicative location related columns
  df_stations = df_stations.drop('usaf', 'wban')
  
  df_stations_post = df_stations.withColumn('neighbor_call',  regex_replace_call_sign).drop('neighbor_name', 'neighbor_state').cache()
  
  df_stations_post.count()
  # write the processed airlines data to disk
  df_stations_post.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_stations_post")

In [0]:
def read_stations(env='dev'):
  """
  This function reads processed airlines data from storage
  input: env - dev/prod
  output: dataframe
  """
  df_stations_post = spark.read.parquet(f"{blob_url}/processed-{env}/df_stations_post").cache()
  return df_stations_post

In [0]:
def get_stations(airports_list, env='dev', write=False):
  """
  This function allows data pipeline to be rerun and to overwrite what is in storage
  then reads what is in storage 
  input: env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_process_stations(airports_list, env=STAGE)
  else:
    pass
  df_stations_post = read_stations(env=STAGE).cache()
  print(f"Your new df_stations has {df_stations_post.count():,} rows.")
  return df_stations_post

In [0]:
def write_agg_stations(df_stations_post, env='dev'):
  """
  There are multiple stations for each airport. keeping all will make our data much bigger with multiple stations per flight
  This function the weather station with minimum distance to each airport 
  then writes processed data to storage
  input: 
  df_stations_post - processd stations data
  env - dev/prod
  output: None
  """
  windowNeighborDist = Window.partitionBy("neighbor_call").orderBy(col("distance_to_neighbor").asc())
  df_stations_agg = df_stations_post.withColumn("row",row_number() \
                               .over(windowNeighborDist)) \
                               .filter(col("row") == 1).drop("row") \
                               .cache()
  
  df_stations_agg.count()
  df_stations_agg.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_stations_agg")

In [0]:
def read_stations_agg(env='dev'):
  """
  This function reads processed stations data from storage
  input: env - dev/prod
  output: dataframe
  """
  df_stations_agg = spark.read.parquet(f"{blob_url}/processed-{env}/df_stations_agg").cache()
  return df_stations_agg

In [0]:
def get_stations_agg(df_stations_post, env='dev', write=False):
  """
  This function allows data pipeline to be rerun and to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_stations_post - processed dataframe of stations data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_agg_stations(df_stations_post, env=STAGE)
  else:
    pass
  df_stations_agg = read_stations_agg(env=STAGE).cache()
  print(f"Your new df_stations_agg has {df_stations_agg.count():,} rows.")
  return df_stations_agg

In [0]:
def write_joined_flights(df_airlines_post, df_stations_agg, env='dev'):
  """
  This function joins airline and staion data and overwrites what is in storage
  input: 
  df_airlines_post - processed airlines data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: None
  """
  # Join stations data with flights data
  df_flights = df_airlines_post.join(df_stations_agg, df_airlines_post["ORIGIN"] ==  df_stations_agg["NEIGHBOR_CALL"], "left").cache()
  df_flights.count()
  df_flights.write.mode("overwrite").partitionBy('YEAR') \
                                    .option("maxRecordsPerFile", max_write_rows) \
                                    .parquet(f"{blob_url}/processed-{env}/df_flights")

In [0]:
def read_flights(env='dev'):
  """
  This function reads joined airline and station data from storage
  input: env - dev/prod
  output: dataframe
  """
  df_flights = spark.read.parquet(f"{blob_url}/processed-{env}/df_flights").cache()
  return df_flights

In [0]:
def get_flights(df_airlines_post, df_stations_agg, env='dev', write=False):
  """
  This function allows airline and station data join to be rerun and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_airlines_post - processed airlines data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_joined_flights(df_airlines_post, df_stations_agg, env=STAGE)
  else:
    pass
  df_flights = read_flights(env=STAGE).cache()
  print(f"Your new df_flights has {df_flights.count():,} rows.")
  return df_flights

In [0]:
def get_weather_filters(df_flights):
  """
  this takes flights data and gets the distinct stations, months, and years of the data
  input: dataframe
  output: list of lists
  """
  # get list of weather stations (neighbor_id) in the data 
  stations_list = df_flights.select("NEIGHBOR_ID").distinct().rdd.flatMap(lambda x: x).collect()
  try:
    stations_list.remove(None)
  except:
    None
  flightYears_list = df_flights.select("YEAR").distinct().rdd.flatMap(lambda x: x).collect()
  flightMonths_list = df_flights.select("MONTH").distinct().rdd.flatMap(lambda x: x).collect()
  return [stations_list, flightMonths_list, flightYears_list]

In [0]:
#### Weather Data

In [0]:
def write_process_weather(stations_list, env='dev'):
  """
  This function ingests raw weather data and writes processed data to storage
  input: 
  list of stations
  output: None
  """
  
  # In some cases, no observations are made and they are recorded across all weather columns as a full set of 9's
  no_observations = "col('WND') != '999,9,9,9999,9'"
  
  checkpoint = time.time()
  if env == 'prod':
  # Load 2015 Q1 for Flights
    df_weather = spark.read.parquet("/mnt/mids-w261/datasets_final_project/weather_data/*").filter( (col('station').isin(stations_list)) & eval(no_observations) ).cache()
  
  else: 
    df_weather = spark.read.parquet("/mnt/mids-w261/datasets_final_project/weather_data/*").filter( (col('station').isin(stations_list)) & eval(no_observations) ).cache()
  
#   stage_time = time.time() - checkpoint
#   print(f"{stage_time} sec - weather data read")
#   checkpoint = time.time()
    
#   df_weather.write.partitionBy('STATION', 'CALL_SIGN') \
#                   .option("maxRecordsPerFile", max_write_rows) \
#                   .mode("overwrite") \
#                   .parquet(f"{blob_url}/processed-{env}/df_weather_filtered")
  
#   df_weather = spark.read.parquet(f"{blob_url}/processed-{env}/df_weather_filtered").cache()  
  
#   stage_time = time.time() - checkpoint
#   print(f"{stage_time} sec - weather data written and re-read")
#   checkpoint = time.time()
    
  # https://sparkbyexamples.com/pyspark/pyspark-split-dataframe-column-into-multiple-columns/
  # weather data dictionary - https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf
  split_wnd = split(df_weather['WND'], ',') # Wind direction and speed, 999. if type_code=v, 999 is variable
  split_cig = split(df_weather['CIG'], ',') 
  split_vis = split(df_weather['VIS'], ',') # Visibility
  split_tmp = split(df_weather['TMP'], ',') # Air temperature
  split_dew = split(df_weather['DEW'], ',') # Dewpoint
  split_slp = split(df_weather['SLP'], ',') # Sea level atmospheric pressure
  split_date_t = split(df_weather['DATE'], 'T')  # splits date from time, 'DATE' 2015-01-01T00:16:00.000+0000
  trim_station = trim(df_weather['STATION'])

  df_weather = df_weather.withColumn('STATION', trim_station) \
                         .withColumn('DATE_SPLIT', split_date_t.getItem(0)) \
                         .withColumn('WND_ANGLE', split_wnd.getItem(0).cast('integer')) \
                         .withColumn('WND_DIR', split_wnd.getItem(2)) \
                         .withColumn('WND_SPEED', split_wnd.getItem(3).cast('integer')) \
                         .withColumn('CIG_DIM', split_cig.getItem(0).cast('integer')) \
                         .withColumn('VIS_DIST', split_vis.getItem(0).cast('integer')) \
                         .withColumn('TMP_AIR', split_tmp.getItem(0)) \
                         .withColumn('DEW_POINT', split_dew.getItem(0)) \
                         .withColumn('SLP_DAY', split_slp.getItem(0).cast('integer')).cache()

  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather values split")
  checkpoint = time.time()
  
  plus_expr = "^.{1}" # temps have "+" character at start
  regex_replace_air = regexp_replace('TMP_AIR', plus_expr, '')
  regex_replace_dew = regexp_replace('DEW_POINT', plus_expr, '')
  split_date_space = split(df_weather['DATE_SPLIT'], ' ') # split of date and time
  df_weather = df_weather.withColumn('TMP_AIR',  regex_replace_air.cast('integer')) \
                         .withColumn('DEW_POINT',  regex_replace_dew.cast('integer')) \
                         .withColumn('DATE_OBS', split_date_space[0]) \
                         .withColumn('TIME_OBS', split_date_space[1]).cache()

  # replace all missing values with an empty value. dont want them to skew an average
  df_weather = df_weather.withColumn('WND_ANGLE', regexp_replace('WND_ANGLE', '999', '')) \
                         .withColumn('WND_SPEED', regexp_replace('WND_SPEED', '999', '')) \
                         .withColumn('CIG_DIM', regexp_replace('CIG_DIM', '999', '')) \
                         .withColumn('VIS_DIST', regexp_replace('VIS_DIST', '9999', '')) \
                         .withColumn('TMP_AIR', regexp_replace('TMP_AIR', '9999', '')) \
                         .withColumn('DEW_POINT', regexp_replace('DEW_POINT', '9999', '')) \
                         .withColumn('SLP_DAY', regexp_replace('SLP_DAY', '99999', '')).cache()


  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather missing 999 values replaced")
  checkpoint = time.time()
  
  # replace empty with null values
  df_weather = df_weather.withColumn("AW1", when(col("AW1")=="" ,None).otherwise(col("AW1"))) \
                   .withColumn("WND_ANGLE", when(col("WND_ANGLE")=="" ,None).otherwise(col("WND_ANGLE"))) \
                   .withColumn("WND_SPEED", when(col("WND_SPEED")=="" ,None).otherwise(col("WND_SPEED"))) \
                   .withColumn("CIG_DIM", when(col("CIG_DIM")=="" ,None).otherwise(col("CIG_DIM"))) \
                   .withColumn("VIS_DIST", when(col("VIS_DIST")=="" ,None).otherwise(col("VIS_DIST"))) \
                   .withColumn("TMP_AIR", when(col("TMP_AIR")=="" ,None).otherwise(col("TMP_AIR"))) \
                   .withColumn("DEW_POINT", when(col("DEW_POINT")=="" ,None).otherwise(col("DEW_POINT"))) \
                   .withColumn("SLP_DAY", when(col("SLP_DAY")=="" ,None).otherwise(col("SLP_DAY"))).cache()
  
  
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather columns replaced with null values")
  checkpoint = time.time()
  
  dateformat_time_obs = date_format('TIME_OBS', 'HH:mm') # Format time to match airline time format
  df_weather = df_weather.withColumn('TIME_OBS', dateformat_time_obs) \
                         .withColumn('HOUR_OBS', hour(col('TIME_OBS'))) \
                         .withColumn('YEAR_OBS', year(col('DATE_OBS'))) \
                         .withColumn('MONTH_OBS', month(col('DATE_OBS'))).cache()

  
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather hour, month, and year extracted")
  checkpoint = time.time()
  
  
  # forward fill data - we know that data is only being reported when there is a change
  cols = ['DATE_SPLIT', 'WND_ANGLE', 'WND_SPEED', 'CIG_DIM', 'VIS_DIST', 'WND_DIR', 'TMP_AIR', 'DEW_POINT', 'SLP_DAY', 'AW1']
  w1 = Window.orderBy('DATE_SPLIT')
  w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
  df_weather = df_weather.select([ c for c in df_weather.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ]).cache()
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather data filled forward")
  checkpoint = time.time()
  
  # time is being reported in UTC
  # want time to be reporting for 2 hours before the flight time
  weather_obs_time = 120 # number of minutes for observation before the flight
  df_weather = df_weather.withColumn('FLIGHT_TIME_OBS', df_weather['TIME_OBS'] + expr(f'INTERVAL {weather_obs_time} MINUTES'))
  dateformat_dep_time = date_format('FLIGHT_TIME_OBS', 'HH:mm')
  
  df_weather_post = df_weather.withColumn( 'FLIGHT_TIME_OBS', dateformat_dep_time) \
                         .withColumn('FLIGHT_HOUR_OBS', hour(col('TIME_OBS'))) \
                         .drop('NAME', 'REPORT_TYPE', 'QUALITY_CONTROL', 'REM', 'EQD').cache()
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather date columns adjusted for flight times")
  checkpoint = time.time()
  
  df_weather_post.count()
  df_weather_post.write.partitionBy('YEAR_OBS') \
                       .option("maxRecordsPerFile", max_write_rows) \
                       .mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_weather_post")
  
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - weather data write to storage complete")


In [0]:
def read_weather(env='dev'):
  df_weather = spark.read.parquet(f"{blob_url}/processed-{env}/df_weather_post").cache()
  return df_weather

In [0]:
def write_filter_weather(flightMonths_list, flightYears_list, df_weather_post, env='dev'):
  # for departure times, remove null value rows - is really only relevant to dev data processing where we choose specific months
  df_weather_post = df_weather_post.filter((col('MONTH_OBS').isin(flightMonths_list)) & (col('YEAR_OBS').isin(flightYears_list))).cache()
  df_weather_post.count()
  df_weather_post.write.option("maxRecordsPerFile", max_write_rows) \
                       .mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_weather_post_filtered")

In [0]:
def read_weather_filtered(env='dev'):
  df_weather = spark.read.parquet(f"{blob_url}/processed-{env}/df_weather_post_filtered").cache()
  return df_weather

In [0]:
def get_weather(stations_list, flightMonths_list, flightYears_list, env='dev', write=False):
  """
  This function allows weather data processing and filtering to be rerun and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_airlines_post - processed airlines data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_process_weather(stations_list, env=STAGE)
    df_weather = read_weather(env=STAGE).cache()
    write_filter_weather(flightMonths_list, flightYears_list, df_weather, env=STAGE)
  else:
    pass
  df_weather_post = read_weather_filtered(env=STAGE).cache()
  print(f"Your new df_weather has {df_weather_post.count():,} rows.")
  return df_weather_post

In [0]:
def write_weather_agg(df_weather_post, env='dev'):
  # Want a single aggregated weather observation for each time period 
  df_weather_agg = df_weather_post.groupBy('STATION', 'DATE_OBS', 'MONTH_OBS', 'YEAR_OBS', 'FLIGHT_HOUR_OBS') \
                             .agg(avg('WND_ANGLE').alias('WND_ANGLE'), \
                                  avg('WND_SPEED').alias('WND_SPEED'), \
                                  avg('CIG_DIM').alias('CIG_DIM'), \
                                  avg('VIS_DIST').alias('VIS_DIST'), \
                                  avg('TMP_AIR').alias('TMP_AIR'), \
                                  avg('DEW_POINT').alias('DEW_POINT'), \
                                  avg('SLP_DAY').alias('SLP_DAY')
                                 ).cache()
  df_weather_post.unpersist()
  df_weather_agg.count()
  df_weather_agg.write.mode("overwrite").option("maxRecordsPerFile", max_write_rows) \
                                        .parquet(f"{blob_url}/processed-{env}/df_weather_agg")

In [0]:
def read_weather_agg(env='dev'):
  df_weather_agg = spark.read.parquet(f"{blob_url}/processed-{env}/df_weather_agg").cache()
  return df_weather_agg

In [0]:
def get_weather_agg(df_weather_post, env='dev', write=False):
  """
  This function allows weather data to be aggregated and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_weather_post - processed weather data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_weather_agg(df_weather_post, env=STAGE)
  else:
    pass
  df_weather_agg = read_weather_agg(env=STAGE).cache()
  print(f"Your new df_weather_agg has {df_weather_agg.count():,} rows.")
  return df_weather_agg

In [0]:
def write_joined_weather_stations(df_weather_agg, df_stations_agg, env='dev'):
  """
  This function joins airline and staion data and overwrites what is in storage
  input: 
  df_airlines_post - processed airlines data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: None
  """
  # join weather data with flights data. drop station columns that will be considered duplicate in final join
  stations_drop_list = ['NEIGHBOR_ID','distance_to_neighbor', 'lat', 'lon', 'neighbor_call', 'neighbor_lat', 'neighbor_lon', 'neighbor_name', 'neighbor_state', 'station_id', 'usaf', 'wban']
  df_weather_stations = df_weather_agg.join(df_stations_agg, df_weather_agg["STATION"] ==  df_stations_agg["NEIGHBOR_ID"], "inner").drop(*stations_drop_list)
  df_weather_stations.count()
  df_weather_stations.write.partitionBy('YEAR_OBS') \
                          .option("maxRecordsPerFile", max_write_rows) \
                          .mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_weather_stations")

In [0]:
def read_joined_weather_stations(env='dev'):
  df_weather_stations = spark.read.parquet(f"{blob_url}/processed-{env}/df_weather_stations").cache()
  return df_weather_stations

In [0]:
def get_joined_weather_stations(df_weather_agg, df_stations_agg, env='dev', write=False):
  """
  This function allows join of weather data and stations data to be joined and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_weather_agg - aggregated weather data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_joined_weather_stations(df_weather_agg, df_stations_agg, env=STAGE)
  else:
    pass
  df_weather_stations = read_joined_weather_stations(env=STAGE).cache()
  df_weather_stations.count()
  return df_weather_stations

In [0]:
def write_joined_flights_weather(df_flights, df_weather_stations, env='dev'):
  """
  This function joins flights and weather data and overwrites what is in storage
  input: 
  df_flights - processed airlines data
  df_weather_stations - aggregated stations data
  env - dev/prod
  output: None
  """
  # we want to combine on location (station) and datetime (hour)
  # flights - FL_DATE YYYY-MM-DD, DEP_HOUR_OF_UTC 
  # weather - DATE_OBS FLIGHT_HOUR_OBS
  # we will also drop string columns only used for the join
  drop_cols = ['FL_DATE', 'ORIGIN', 'DATE_OBS', 'TAIL_NUM', 'STATION_ID', 'STATION', 'neighbor_id','neighbor_call', 'DEP_HOUR_OF_FORMATTED',  'FL_DATETIME', 'FL_DATETIME_UTC', 'TIMEZONE']
  df_flights_weather = df_flights.join(df_weather_stations, (df_flights["NEIGHBOR_ID"] ==  df_weather_stations["STATION"]) & (df_flights["FL_DATE"] == df_weather_stations["DATE_OBS"]) & (df_flights["DEP_HOUR_OF_UTC"] ==  df_weather_stations["FLIGHT_HOUR_OBS"]), "left").drop(*drop_cols).cache()
  df_flights_weather = df_flights_weather.withColumn("id", monotonically_increasing_id())
  df_flights_weather.count()
  df_flights_weather.write.option("maxRecordsPerFile", max_write_rows) \
                          .mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_flights_weather") 

In [0]:
def read_joined_flights_weather(env='dev'):
  df_flights_weather = spark.read.parquet(f"{blob_url}/processed-{env}/df_flights_weather").cache()
  return df_flights_weather

In [0]:
def get_joined_flights_weather(df_flights, df_weather_stations, env='dev', write=False):
  """
  This function allows join of weather data and stations data to be joined and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_weather_agg - aggregated weather data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: dataframe
  """
  STAGE = env
  if write == True:
    write_joined_flights_weather(df_flights, df_weather_stations, env=STAGE)
  else:
    pass
  df_flights_weather = read_joined_flights_weather(env=STAGE).cache()
  print(f"Your new df_flights_weather has {df_flights_weather.count():,} rows.")
  return df_flights_weather

In [0]:
#### Process Pipeline

In [0]:
def process_pipeline(env='dev', write=False):
  """
  This function allows the full pipeline to be run
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  env - dev/prod
  write - True/False
  output: dataframe
  """

  write_flag = write
  STAGE = env
  if write == True:
    checkpoint = time.time()
    airlines_data = get_airlines(env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - airlines processed")
    checkpoint = time.time()
    a_list = get_airports_list(airlines_data, env=STAGE)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - retrieved distinct airline list")
    checkpoint = time.time()
    stations_data = get_stations(a_list, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - stations for select airlines processed")
    checkpoint = time.time()
    agg_stations_data = get_stations_agg(stations_data, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - stations data minimized")
    checkpoint = time.time()
    flights_data = get_flights(airlines_data, agg_stations_data, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - airlines and station data joined")
    airlines_data.unpersist()
    stations_data.unpersist() 
    checkpoint = time.time()
    w_stations, w_months, w_years = get_weather_filters(flights_data)
    weather_data = get_weather(w_stations, w_months, w_years, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - weather data processed")
    checkpoint = time.time()
    agg_weather_data = get_weather_agg(weather_data, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - weather data aggregated")
    weather_data.unpersist()
    checkpoint = time.time()
    weather_stations_data = get_joined_weather_stations(agg_weather_data, agg_stations_data, env=STAGE, write=write_flag)
    stage_time = time.time() - checkpoint
    # print(f"{stage_time} sec - weather and stations data joined")
    agg_weather_data.unpersist()
    agg_stations_data.unpersist()
  else: 
    flights_data = None
    weather_stations_data = None
  checkpoint = time.time()
  flights_weather_data = get_joined_flights_weather(flights_data, weather_stations_data, env=STAGE, write=write_flag)
  stage_time = time.time() - checkpoint
  # print(f"{stage_time} sec - join time of weather and flights data")
  if write:
    flights_data.unpersist()
    weather_stations_data.unpersist()
  flights_weather_data.count()
  return flights_weather_data
  

In [0]:
#### Data Transform and Split

In [0]:
def get_feature_cols(dataframe, env='dev'):
  label_cols = ['DEP_DEL15', 'DEP_DELAY_NEW']
  feature_cols = dataframe.drop(*label_cols).columns
  return feature_cols

In [0]:
def get_vectorassembed(dataframe, env='dev'):
  """
  This function will translate a given dataframe into one with a features and an output column
  """
  STAGE=env
  feature_cols = get_feature_cols(dataframe, env=STAGE)

  assembler = VectorAssembler(
      inputCols=feature_cols,
      outputCol="features")
  
  dataframe_vec_cols = assembler.setParams(handleInvalid="skip").transform(dataframe).cache()
  
  return dataframe_vec_cols

In [0]:
def poormans_split(env='dev'):
  """
  This function first handles vector assembly with feature columns and then 
  does a silly split by years for a larger dataset into 3 groups
  input: single dataframe
  output: 3 dataframes
  """
  # logistic reg. takes one column value as input 
  # vector assembly to translate columns into single vector column 
  # transform columns into single column feature dataset
  # we skip invalids as logistic reg. will not take NaN
  # https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html#pyspark.ml.feature.VectorAssembler.handleInvalid
  # get columns 
  STAGE = env
  df_flights_weather = read_joined_flights_weather(env=STAGE).cache()
  
  flights_weather_vec_cols = get_vectorassembed(df_flights_weather, env=STAGE)
  df_flights_weather.unpersist()
  
  train_log = flights_weather_vec_cols.filter(col('YEAR').isin([2015,2016])).cache()
  validation_log = flights_weather_vec_cols.filter(col('YEAR').isin([2017,2018])).cache()
  test_log = flights_weather_vec_cols.filter(col('YEAR') == 2019).cache()
  
  train_count = train_log.count()
  validation_count = validation_log.count()
  test_count = test_log.count()
  flights_weather_vec_cols.unpersist()
  print(f"Your testing datasets have {train_count:,}, {validation_count:,}, and {test_count:,} rows.")
  return train_log, validation_log, test_log

In [0]:
def model_performance_charts(log_Model):
  beta = np.sort(log_Model.coefficients)
  plt.plot(beta)
  plt.ylabel('Beta Coefficients')
  plt.show()
  trainingSummary = log_Model.summary
  roc = trainingSummary.roc.toPandas()
  plt.plot(roc['FPR'],roc['TPR'])
  plt.ylabel('False Positive Rate')
  plt.xlabel('True Positive Rate')
  plt.title('ROC Curve')
  plt.show()
  print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))
  pr = trainingSummary.pr.toPandas()
  plt.plot(pr['recall'],pr['precision'])
  plt.ylabel('Precision')
  plt.xlabel('Recall')
  plt.show()

In [0]:
def model_perf_summary(log_Model):
  # model evaluation https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
  trainingSummary = log_Model.summary
  accuracy = trainingSummary.accuracy
  falsePositiveRate = trainingSummary.weightedFalsePositiveRate
  truePositiveRate = trainingSummary.weightedTruePositiveRate
  fMeasure = trainingSummary.weightedFMeasure()
  precision = trainingSummary.weightedPrecision
  recall = trainingSummary.weightedRecall
  areaUnderROC = trainingSummary.areaUnderROC
  print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [0]:
def write_split_data(dataframe, env='dev', features='dev'):
  """
  This defines the splits of the data by year
  """
  if features == 'prod':
    model = "pca"
  else:
    feature_col = 'features'
    model = features
  # hold-out test set
  hold_out_variable = 'YEAR'
  test_years = [2019]

  years_list = dataframe.select(hold_out_variable).distinct().rdd.flatMap(lambda x: x).collect()
  train_years = list(set(years_list) - set([2019]))

  # train and test set
#   trainDF = read_joined_flights_weather(env=STAGE).filter(col(hold_out_variable).isin(train_years) ).cache()
#   testDF = read_joined_flights_weather(env=STAGE).filter(col(hold_out_variable).isin(test_years) ).cache()
  trainDF = dataframe.filter(col(hold_out_variable).isin(train_years)).cache()
  testDF = dataframe.filter(col(hold_out_variable).isin(test_years) ).cache()
  
  # time to beat - 1.07 min
  trainDF.count()
  trainDF.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/trainDF-{model}_features")
  testDF.count()
  testDF.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/testDF-{model}_features")
  
  trainDF = spark.read.parquet(f"{blob_url}/processed-{env}/trainDF-{model}_features").cache()
  testDF = spark.read.parquet(f"{blob_url}/processed-{env}/testDF-{model}_features").cache()
  
  # time to beat - 38 sec
  trainDF.groupBy('YEAR').count()
  testDF.groupBy('YEAR').count()
  
  # trainDF.groupBy('DEP_DEL15').count().show()
  
  # split the data given labels
  minor_df = trainDF.filter(col('DEP_DEL15')==1).cache()
  major_df = trainDF.filter(col('DEP_DEL15')==0).cache()
  trainDF.unpersist()
  trainDF.unpersist()
  
  minor_df.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/minor_df-{model}_features")
  major_df.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/major_df-{model}_features")
  
  minor_df = spark.read.parquet(f"{blob_url}/processed-{env}/minor_df-{model}_features").cache()
  major_df = spark.read.parquet(f"{blob_url}/processed-{env}/major_df-{model}_features").cache()

  n_ontime = major_df.count()
  n_delays = minor_df.count()
  
  ratio = n_ontime/n_delays
  print('The ratio of on-time to delayed flights is of {:0.1f}:1'.format(ratio))

  
  # time to beat - 2.85 min
  
  # oversample the delayed flights
  oversample_df = minor_df.sample(withReplacement=True, fraction=ratio, seed=321)
  df_augmentedTrain = major_df.unionAll(oversample_df).cache()
  
  
  # df_augmentedTrain.groupBy('DEP_DEL15').count().show()
  df_augmentedTrain.count()
  major_df.unpersist()
  minor_df.unpersist()
  
  df_augmentedTrain.write.mode("overwrite").parquet(f"{blob_url}/processed-{env}/df_augmentedTrain-{model}_features")

In [0]:
def read_split_data(env='dev', features='dev'):
  if features == 'prod':
    model = "pca"
  else:
    feature_col = 'features'
    model = features
  trainDF = spark.read.parquet(f"{blob_url}/processed-{env}/df_augmentedTrain-{model}_features").cache()
  testDF = spark.read.parquet(f"{blob_url}/processed-{env}/testDF-{model}_features").cache()
  return trainDF, testDF

In [0]:
def get_split_data(dataframe, env='dev', features='dev', write=False):
  """
  This function allows join of weather data and stations data to be joined and 
  to overwrite what is in storage
  then reads what is in storage 
  input: 
  df_weather_agg - aggregated weather data
  df_stations_agg - aggregated stations data
  env - dev/prod
  output: dataframe
  """
  FEATURES = features
  STAGE = env
  if write == True:
    write_split_data(dataframe, env=STAGE, features=FEATURES)
  else:
    pass
  df_train, df_test = read_split_data(env=STAGE, features=FEATURES)
  print(f"Your new df_train has {df_train.count():,} rows.")
  print(f"Your new df_test has {df_test.count():,} rows.")
  return df_train, df_test

In [0]:
def count_folds(dataframe):
  """
  returns the total number of folds in the dataset
  """
  dataframe_vec = get_vectorassembed(dataframe, env=STAGE).cache()
  fold_variable = 'YEAR'
  fold_list = dataframe_vec.select(fold_variable).distinct().toPandas()[fold_variable]
  mapping = {x: x - fold_list.min() for x in fold_list}
  print(f'{fold_variable.capitalize()}, fold_number mapping: {mapping}')

  # define number of Folds as nYEARS - 1
  nFolds = len(fold_list) - 1
  return nFolds

In [0]:
def write_cvdata(train_data, test_data, env='dev', features='dev'):
  """
  This functions determines the number of folds for each year and creates a dataset with a new column showing which set it is a part of
  """
  
  STAGE = env
  # extract number of distinct YEARS in the training data and create a map
  fold_variable = 'YEAR'
  
  if features == 'prod':
    feature_col = 'pcaFeatures'
    train_df_vec = train_data
    test_df_vec = test_data
    model = "pca"
  else:
    feature_col = 'features'
    # perform vector assembly
    # transform split data to an assembly vector - perform after the split as it might write new rows to oversample
    train_df_vec = get_vectorassembed(train_data, env=STAGE).cache()
    test_df_vec = get_vectorassembed(test_data, env=STAGE).cache()
    # train_df_vec = train_df.cache()
    # test_df_vec = test_df.cache()
    model = features
  
  fold_list = train_df_vec.select(fold_variable).distinct().toPandas()[fold_variable]
  mapping = {x: x - fold_list.min() for x in fold_list}
  print(f'{fold_variable.capitalize()}, fold_number mapping: {mapping}')

  # define number of Folds as nYEARS - 1
  nFolds = len(fold_list) - 1
  print(f'Total number of folds: {nFolds}')
  # https://stackoverflow.com/questions/42980704/pyspark-create-new-column-with-mapping-from-a-dict
  mapping_expr = create_map([lit(str(x)) for x in chain(*mapping.items())])
  foldedTrainDF = train_df_vec.withColumn('foldCol', mapping_expr[col(fold_variable)].cast('integer')).cache()
  foldedTestDF = test_df_vec.withColumn('foldCol', mapping_expr[col(fold_variable)].cast('integer')).cache()

  # foldedTrainDF = foldedTrainDF.withColumnRenamed(('DEP_DEL15', 'label_cat'), ('DEP_DEL15', 'label_cont')).cache()
  foldedTrainDF = foldedTrainDF.filter(col('DEP_DEL15').isNotNull()).select(label_cat, label_cont, feature_col, 'foldCol').cache()
  foldedTestDF = foldedTestDF.filter(col('DEP_DEL15').isNotNull()).select(label_cat, label_cont, feature_col).cache()

  train_df_vec.unpersist()
  
  foldedTrainDF.count()
  foldedTrainDF.write.mode("overwrite").parquet(f"{blob_url}/processed-{STAGE}/foldedTrainDF-{model}_features")
  foldedTestDF.count()
  foldedTestDF.write.mode("overwrite").parquet(f"{blob_url}/processed-{STAGE}/foldedTestDF-{model}_features")

In [0]:
def read_cvdata(env='dev', features='dev'):
  if features == 'prod':
    model = "pca"
  else:
    model = features
  foldedTrainDF = spark.read.parquet(f"{blob_url}/processed-{STAGE}/foldedTrainDF-{model}_features").cache()
  foldedTestDF = spark.read.parquet(f"{blob_url}/processed-{STAGE}/foldedTestDF-{model}_features").cache()
  return foldedTrainDF, foldedTestDF

In [0]:
def get_cvdata(train_data, test_data, env='dev', features='dev', write=False):
  """
  Returns train and test split dataframes
  """
  STAGE = env
  FEATURES = features
  if write == True:
    write_cvdata(train_data, test_data, env=STAGE, features=FEATURES)
  else:
    pass
  train_df, test_df =  read_cvdata(env=STAGE, features=FEATURES)
  print(f"Your new training set has {train_df.count():,} rows.")
  print(f"Your new test set has {test_df.count():,} rows.")
  return train_df, test_df

In [0]:
class TimeSeriesCrossValidator(CrossValidator):
    '''
    Customizes CrossValidator to perform time series cross validation on a rolling basis.
    User needs to provide `foldCol` with the fold numbers defined in a time ascending order
    (e.g. 2015 is assigned as fold 0, 2016 as fold 1, and so on).
    '''
    def _kFold(self, dataset):
        nFolds = self.getOrDefault(self.numFolds)
        foldCol = self.getOrDefault(self.foldCol)

        datasets = []
        if not foldCol:
            # Do random k-fold split.
            seed = self.getOrDefault(self.seed)
            h = 1.0 / nFolds
            randCol = self.uid + "_rand"
            df = dataset.select("*", rand(seed).alias(randCol))
            for i in range(nFolds):
                validateLB = i * h
                validateUB = (i + 1) * h
                condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
                validation = df.filter(condition)
                train = df.filter(~condition)
                datasets.append((train, validation))
        else:
            # Use user-specified fold numbers.
            def checker(foldNum):
                if foldNum < 0 or foldNum > nFolds:
                    raise ValueError(
                        "Fold number must be in range [0, %s], but got %s." % (nFolds, foldNum)
                    )
                return True

            checker_udf = UserDefinedFunction(checker, BooleanType())
            for i in range(nFolds):
                training = dataset.filter(checker_udf(dataset[foldCol]) & (col(foldCol) <= lit(i))) # Training set always in the past
                validation = dataset.filter(
                    checker_udf(dataset[foldCol]) & (col(foldCol) == lit(i+1)) # Validation set always in the future
                )
                if training.rdd.getNumPartitions() == 0 or len(training.take(1)) == 0:
                    raise ValueError("The training data at fold %s is empty." % i)
                if validation.rdd.getNumPartitions() == 0 or len(validation.take(1)) == 0:
                    raise ValueError("The validation data at fold %s is empty." % i)
                datasets.append((training, validation))

        return datasets

In [0]:
def apply_pca(env='dev'):
  # Apply PCA
  pca = PCA(k=100, inputCol="scaledFeatures")
  pca.setOutputCol("pcaFeatures")
  model_pca = pca.fit(transformed_data.filter('YEAR < 2019'))
  pca_data = model_model.transform(transformed_data).select(*['DEP_DEL15', 'DEP_DELAY_NEW', "YEAR", "pcaFeatures"])
  pca_data.write.mode('overwrite').parquet(f"{blob_url}/feature-engineering/pca-data")
  