# Data Engineering Process

Overall data engineering process for extracting relevant features for machine learning. This includes both variables specific to the flights dataset, and weather variables.

![data_engineering](https://user-images.githubusercontent.com/14703336/101406370-5b74b080-38a7-11eb-8789-e06ecec8294c.png)

In [0]:
# Prepare libraries
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, ArrayType
from pyspark.sql import SQLContext
import math
import numpy as np
from pyspark.sql.functions import expr,lit,current_timestamp, to_date

sqlContext = SQLContext(sc)

# 1. Preparing flights data

## Step 1.1: Load data into table
### Tables: us_delay_flights_tbl / stations_tbl / weather_tbl / airport

First, we load the provided data into tables:
 - flights
 - weather
 - stations 
 
In addition, we load a table for additional airport data, including geographic location (latitude, longitude), IATA code, and the time zone of the airport. We will use this information in merging the weather table with the flights table.

In [0]:
spark.sql("drop table if exists us_delay_flights_tbl")
spark.sql("drop table if exists weather_tbl")
spark.sql("drop table if exists stations_tbl")
spark.sql("drop table if exists airport_tbl")


airlines = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/201*.parquet")
airlines.write.saveAsTable("us_delay_flights_tbl")


weather = spark.read.option("header", "true")\
                    .parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/weather_data/*.parquet")
stations = spark.read.option("header", "true").csv("dbfs:/mnt/mids-w261/datasets_final_project/stations/stations.csv.gz")
# 28 minutes
weather.write.saveAsTable("weather_tbl")
stations.write.saveAsTable("stations_tbl")

# Download airport geo data from openflights: https://openflights.org/data.html
import urllib.request
urllib.request.urlretrieve("https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports-extended.dat","/tmp/airports.dat")
dbutils.fs.mv("file:/tmp/airports.dat", "dbfs:/data/airports.dat")

#Create schema for the airport table
airport_schema = StructType([\
    StructField("airport_id", IntegerType(), False),\
    StructField("name", StringType(), False),\
    StructField("city", StringType(), True),\
    StructField("country", StringType(), True),\
    StructField("IATA", StringType(), False),\
    StructField("ICAO", StringType(), True),\
    StructField("latitude", DoubleType(), True),\
    StructField("longitude", DoubleType(), True),\
    StructField("altitude", DoubleType(), True),\
    StructField("timzone", StringType(), True),\
    StructField("DST", StringType(), True),\
    StructField("tztimezone", StringType(), True),\
    StructField("type", StringType(), True),\
    StructField("source", StringType(), True)])

airportsDF = spark.read.format("csv")\
          .option("header", "false")\
          .schema(airport_schema)\
          .load("/data/airports.dat")

airportsDF.write.saveAsTable("airport_tbl")  # airlines geo


## Step 1.2: Adding PageRank feature to airports 
### Table: airport_pagerank

As a potential feature for our model, we calculate the PageRank score for each airport. We think that delays should be related to the centrality of the airport. More central airports likely have more flights going in and out, which can either mean more delays on general, or better regulation and manpower (leading to less delays). If an airport has a flight to another airport, that constitutes a directed link between the two airports. The "graphframes" library takes as input a set of vertices (airports), and links (origin and destination for each row) in order to calculate the PageRank score for each airport.

In [0]:
#1.28 minutes
# Get PageRank for each airport
spark.sql("drop table if exists airport_pagerank")
from graphframes import *
vertices = spark.sql("select ORIGIN as id from airport_used_tbl")
edges = spark.sql("select ORIGIN as src, DEST as dst, DEP_DELAY from us_delay_flights_tbl")
g = GraphFrame(vertices, edges)
ranks = g.pageRank(resetProbability=0.15, maxIter=5)
ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).write.saveAsTable("airport_pagerank")

## Step 1.3: Add flight delay ratio feature 
### Tables: airport_used_tbl, airport_delay_analysis

The first feature we have is the flight delay ratio based on the origin of the flight. We categorize a flight into delayed (by at least 15 minutes), non-delayed, and missing (likely a cancellation).

In [0]:
spark.sql("drop table if exists airport_used_tbl")
airports_usedDF = spark.sql("with airports as (select  ORIGIN from us_delay_flights_tbl union all select DEST from us_delay_flights_tbl) select distinct ORIGIN from airports ")
airports_usedDF.write.saveAsTable('airport_used_tbl')  # airlines

spark.sql("drop table if exists airport_delay_analysis ")
airport_delay_analysisDF = spark.sql("select distinct origin \
,sum(dep_del15) as num_delay \
,sum(1-dep_del15) as num_non_delay \
,sum(if(dep_del15 is null, 1, 0)) as num_missing \
,count(*) as total_flights \
,sum(dep_del15) / count(*) as ratio \
,sum(arr_del15) / count(*) as arr_ratio \
from us_delay_flights_tbl \
group by origin \
order by ratio")
airport_delay_analysisDF.write.saveAsTable("airport_delay_analysis")

## Step 1.4: Parse and clean flights data
### Table: flight_adjusted

While we aim to prepare as many variables as possible, we decided that the DIV variables have too many NULL values and cannot be used in modeling. These variables represent "Diverted Airport" variables, which are airports capable of handling events such an emergency landing. While potentially useful since emergencies will clearly be prioritized over regular takeoff, these events are rare enough that we often do not see a meaningful value for these variables. As a result, they will not be processed and dropped from the table.

Next, we also calculate the departure time - 2 hrs, representing the ideal time when we would like to merge corresponding weather data. This information is stored at the next merge step.

In [0]:
#create time table flight_adjusted
spark.sql('drop table if exists flight_adjusted')
dbutils.fs.rm('/delta/flight_adjusted',True)

#drop irrelevant tables
airlines\
.drop("DIV_AIRPORT_LANDINGS","DIV_REACHED_DEST","DIV_ACTUAL_ELAPSED_TIME","DIV_ARR_DELAY","DIV_DISTANCE","DIV1_AIRPORT")\
.drop("DIV1_AIRPORT_ID","DIV1_AIRPORT_SEQ_ID","DIV1_WHEELS_ON","DIV1_TOTAL_GTIME","DIV1_LONGEST_GTIME")\
.drop("DIV1_WHEELS_OFF","DIV1_TAIL_NUM","DIV2_AIRPORT","DIV2_AIRPORT_ID","DIV2_AIRPORT_SEQ_ID","DIV2_WHEELS_ON","DIV2_TOTAL_GTIME","DIV2_LONGEST_GTIME")\
.drop("DIV2_WHEELS_OFF","DIV2_TAIL_NUM","DIV3_AIRPORT","DIV3_AIRPORT_ID","DIV3_AIRPORT_SEQ_ID","DIV3_WHEELS_ON","DIV3_TOTAL_GTIME","DIV3_LONGEST_GTIME")\
.drop("DIV3_WHEELS_OFF","DIV3_TAIL_NUM","DIV4_AIRPORT","DIV4_AIRPORT_ID","DIV4_AIRPORT_SEQ_ID","DIV4_WHEELS_ON","DIV4_TOTAL_GTIME","DIV4_LONGEST_GTIME")\
.drop("DIV4_WHEELS_OFF","DIV4_TAIL_NUM","DIV5_AIRPORT","DIV5_AIRPORT_ID","DIV5_AIRPORT_SEQ_ID","DIV5_WHEELS_ON","DIV5_TOTAL_GTIME","DIV5_LONGEST_GTIME")\
.drop("DIV5_WHEELS_OFF","DIV5_TAIL_NUM")\
.withColumn('departure_time',expr("cast(concat(FL_DATE,' ', left(CRS_DEP_TIME,  length(CRS_DEP_TIME)-2), ':',right(CRS_DEP_TIME,2)) as timestamp)"))\
.withColumn('report_time',expr("cast(concat(FL_DATE,' ', left(CRS_DEP_TIME,  length(CRS_DEP_TIME)-2), ':',right(CRS_DEP_TIME,2)) as timestamp)- INTERVAL 2 hours"))\
.withColumn('report_time_utc',expr("timestamp 'today'"))\
.withColumn('arr_time_dest',expr("cast(concat(FL_DATE,' ', left(CRS_ARR_TIME,  length(CRS_ARR_TIME)-2), ':',right(CRS_ARR_TIME,2)) as timestamp)"))\
.write.format("delta").mode("overwrite").partitionBy("ORIGIN").save("/delta/flight_adjusted/")

spark.sql("CREATE TABLE flight_adjusted USING DELTA LOCATION '/delta/flight_adjusted/'")

Here we merge the flights table with the airport table. In most cases, the ORIGIN from the flights table matches the IATA code from the airports table. There are two exceptions, which is airport marked XWA, which corresponds to IATA code ISN (ISN was decommissioned and renamed XWA). Another airport is TKI, which corresponds to KTKI.

In [0]:
%sql
-- This convers local time to UTC 
MERGE INTO flight_adjusted as f 
USING airport_tbl AS a 
ON f.ORIGIN = a.IATA 
WHEN MATCHED THEN UPDATE set f.report_time_utc = cast(int(f.report_time) - a.timzone * 60 * 60 as timestamp);

MERGE INTO flight_adjusted as f 
USING airport_tbl AS a 
ON f.ORIGIN = 'XWA' and a.IATA ='ISN'
WHEN MATCHED THEN UPDATE set f.report_time_utc = cast(int(f.report_time) - a.timzone * 60 * 60 as timestamp);

MERGE INTO flight_adjusted as f 
USING airport_tbl AS a 
ON f.ORIGIN = 'TKI' and a.ICAO ='KTKI'
WHEN MATCHED THEN UPDATE set f.report_time_utc = cast(int(f.report_time) - a.timzone * 60 * 60 as timestamp);

## Step 1.5: Adding additional feature -- dep_delay_last_hour
### Tables: flights_delay, flights_add_features, flights_pagerank

In this section, we calculate for previous hour's delay ratio for each flight, and the ratio of last hour's delay compared to regional average departure delay.

In [0]:
# 2.14 minutes
spark.sql("drop table if exists flights_delay")
flightDF = spark.sql("select *\
, ifnull(sum(DEP_DEL15) over (partition by ORIGIN order by cast(departure_time as int) range between 10800 preceding and 7200 preceding)/ count(1) over \
(partition by ORIGIN order by cast(departure_time as int) range between 10800 preceding and 7200 preceding),0) as dep_delay_last_hour\
, ifnull(sum(ARR_DEL15) over (partition by DEST order by cast(departure_time as int) range between 10800 preceding and 7200 preceding)/ count(1) over \
                     (partition by DEST order by cast(departure_time as int) range between 10800 preceding and 7200 preceding),0) as arr_delay_last_hour \
from flight_adjusted order by FL_DATE, CRS_DEP_TIME")
flightDF.write.saveAsTable("flights_delay")

#44 seconds
spark.sql("drop table if exists flights_add_features")
flight2DF = spark.sql("select f.* , ifnull(dep_delay_last_hour/ao.ratio,0) as dep_delay_ratio, arr_delay_last_hour/ad.arr_ratio as arr_delay_ratio \
from flights_delay f join airport_delay_analysis ao on f.ORIGIN = ao.ORIGIN \
join airport_delay_analysis ad on f.DEST = ad.ORIGIN ")
flight2DF.write.saveAsTable("flights_add_features")

#51 seconds
spark.sql("drop table if exists flights_pagerank")
dbutils.fs.rm("dbfs:/user/hive/warehouse/flights_pagerank", True)
flight3DF = spark.sql("select f.* , ao.pagerank, ad.pagerank as pagerank_dest \
from flights_add_features f join airport_pagelink ao on f.ORIGIN = ao.id \
join airport_pagelink ad on f.DEST = ad.id ")
flight3DF.write.saveAsTable("flights_pagerank")

# 2. Preparing weather data
Here, we write a function that calculates the Haversine distance between airports, which is the closest distance between two points on a spheres. First, we calculate a used_stationsDF, which represents the weather stations. Then, for each airport, we calculate the distance to all weather stations, and keep track of the ones within a prespecified minimum distance. This way, we can find the closest weather station within a certain distance for each airport. This is registered as a separate table.

In [0]:
used_stationsDF = spark.sql("select distinct concat(usaf, wban) as station, lat, lon from stations_tbl s where  begin < '20200101' and end >= '20150101' and lat is not null").collect()

# Calculate Haversine distance between airport and startions
def get_all_stations_within(latitude, longitude, distance):
  def Haversine(lat1,lon1,lat2,lon2) :
    '''Function for calculating Haversine distance between two coordinates'''
    R = 6371.0088
    lat1 = math.radians(float(lat1))
    lon1 = math.radians(float(lon1))
    lat2 = math.radians(float(lat2))
    lon2 = math.radians(float(lon2))

    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2) **2
    c = 2 * np.arctan2(a**0.5, (1-a)**0.5)
    d = R * c
    return round(float(d),4)


  airport_station_list = []
  for s in used_stationsDF:
    #print(latitude,longitude,s.lat,s.lon, end="")
    h = Haversine(latitude, longitude, s.lat, s.lon)
    if h<distance:
      airport_station_list.append([s.station, h])
    #print(" ---- ", h)
    
  return airport_station_list
spark.udf.register("list_AllStationWithinUDF", get_all_stations_within, 
                       ArrayType(StructType([StructField("station", StringType()),
                                  StructField("distance", FloatType())])))

## 2.1 Find stations for each airport
### Table: airport_station_maps

Here, we create the weather stations table in order to find the best weather data point for each flight. In order to determine the best distance, we tried looking at distances around 10KM, 25KM or within 85KM. For most airports, we can find the necessary data by using a 25KM radius to search for stations. For BQN and PSE, we use 85KM radius as these are the closest stations available for these airports. This way, we were able to get weather data joined for all flights.

In [0]:
%sql
-- Create table mapping table between airport and station for a radius of 100KM
drop table if exists airport_station_maps;
create table airport_station_maps (IATA STRING, station STRING, distance FLOAT);

with cte as (select a.IATA,explode(list_AllStationWithinUDF(a.latitude,a.longitude,100)) as stations
  from airport_used_tbl au join airport_tbl a on a.IATA = au.ORIGIN 
  union all
  select au.ORIGIN, explode(list_AllStationWithinUDF(a.latitude,a.longitude,100))
  from airport_used_tbl au, airport_tbl a 
  where a.IATA = 'ISN' and au.ORIGIN = 'XWA' 
  union all 
  select au.ORIGIN, explode(list_AllStationWithinUDF(a.latitude,a.longitude,100))
  from airport_used_tbl au, airport_tbl a 
  where a.ICAO = 'KTKI' and au.ORIGIN = 'TKI'
)
insert into airport_station_maps
select IATA, stations.station, stations.distance
from cte;

In [0]:
spark.sql('drop table if exists all_weather')
dbutils.fs.rm('/delta/all_weather',True)

# Pick weather data from closest station within 25KM, or 85KM (for BQN and PSE)
spark.sql("select asm.IATA as ORIGIN, w.station, date, to_date(date) as date_key, elevation, WND, CIG, VIS, TMP, DEW, SLP \
      FROM weather_tbl w join airport_station_maps asm on w.station = asm.station \
      where (asm.distance < 25 or (asm.IATA in ('BQN', 'PSE') and asm.distance<85)) \
      and date between '2014-12-30' and '2020-01-01'")\
    .write.partitionBy("ORIGIN").format("delta").save("/delta/all_weather")
spark.sql("CREATE TABLE all_weather USING DELTA LOCATION '/delta/all_weather'")

The weather data after joining with the stations table is very unclean. For example, some variables are labeled with a string of "9" when they are missing. We do not want to consider such rows when joining into the flights table.

In [0]:
%sql
--5.35 minutes
-- Merge flight time into weather table
delete from all_weather
where WND='999,9,9,9999,9' and CIG ='99999,9,9,9' and VIS ='999999,9,9,9' and TMP='+9999,9' and DEW='+9999,9' and SLP='99999,9';

insert into all_weather 
select distinct f.ORIGIN, null,  f.report_time_utc , null, null, null, null, null, null, null, null
from flights_pagerank AS f left join all_weather AS w ON f.ORIGIN = w.ORIGIN AND f.report_time_utc=w.date
where w.ORIGIN IS NULL
UNION ALL
select distinct f.DEST, null,  f.report_time_utc , null, null, null, null, null, null, null, null
from flights_pagerank AS f left join all_weather AS w ON f.DEST = w.ORIGIN AND f.report_time_utc=w.date
where w.ORIGIN IS NULL;

-- For PPG airport only - it's missing 2015 weather data
-- We will use 2016 weather data insert back to 2015 and forward fill
insert into all_weather 
select ORIGIN, null, date - INTERVAL 1 year, null, elevation, WND, CIG, VIS, tmp, DEW, SLP
from all_weather
where ORIGIN = 'PPG'
and year(date)=2016;

## 2.2 Parse weather data
### Table: parsed_weather_data

The weather data is also encoded in a messy manner. For example, visibility has 3 fields representing distance, a classification of the distance as a code, and a quality of data code.  Here, we parse weather data into associated fields. Specific format for each field is extracted from the NOAA website:

https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf

In [0]:
# 16.75 minutes
spark.sql("drop table if exists parsed_weather_data")
dbutils.fs.rm("/delta/parsed_weather_data",True)

spark.sql("Select ORIGIN, \
  date, \
  int(left(WND, 3)) as wnd_direction, \
  substring(WND, 5, 1) as wnd_dir_quality, \
  substring(WND, 7,1) as wnd_type, \
  int(substring(WND, 9,4))/10 as wnd_speed, \
  substring(WND, 14,1) as wnd_speed_quality, \
  int(left(CIG, 5)) as cig_height, \
  substring(CIG, 7,1) as cig_height_quality, \
  substring(CIG, 9,1) as cig_code, \
  right(CIG,1) as cig_cavok_code, \
  int(left(VIS,6)) as vis_distance, \
  substring(VIS, 8, 1) as vis_distance_quality, \
  substring(VIS, 10,1) as vis_var_code, \
  substring(VIS, 12,1) as vis_var_quality, \
  int(left(TMP, 5)) /10.0 as tmp, \
  substring(TMP,7,1) as tmp_quality, \
  int(left(DEW, 5)) /10.0 as dew, \
  substring(DEW, 7, 1) as dew_quality, \
  int(left(SLP, 5)) /10.0 as slp, \
  substring(SLP, 7, 1) as slp_quality, \
  elevation \
from all_weather")\
.write.format("delta").mode("overwrite").partitionBy("ORIGIN").save("/delta/parsed_weather_data")

spark.sql("CREATE TABLE parsed_weather_data USING DELTA LOCATION '/delta/parsed_weather_data'")

In [0]:
 %sql
-- 18.99 minutes
-- Clean up missing values
-- wind
update parsed_weather_data
set wnd_direction = Null
where wnd_dir_quality = 3 or wnd_dir_quality=7 or wnd_direction > 360;

-- wind type code
--update parsed_weather_data
--set wnd_type = Null
--where wnd_type = 9;

-- WIND-OBSERVATION speed rate
-- The rate of horizontal travel of air past a fixed point.
-- MIN: 0000 MAX: 0900 UNITS: meters per second
update parsed_weather_data
set wnd_speed = Null
where wnd_speed_quality =3 or wnd_speed_quality=7 or wnd_speed > 999;

-- SKY-CONDTION-OBSERVATION ceiling quality code
-- The code that denotes a quality status of a reported ceiling height dimension.
-- DOM: A specific domain comprised of the characters in the ASCII character set
update parsed_weather_data
set cig_height = Null
where cig_height_quality=3 or cig_height_quality=7 or cig_height > 22000;

-- SKY-CONDITION-OBSERVATION ceiling determination code
-- The code that denotes the method used to determine the ceiling.
-- DOM: A specific domain comprised of the characters in the ASCII character set
--update parsed_weather_data
--set cig_code = Null
--where cig_code = 9;

-- SKY-CONDITION-OBSERVATION CAVOK code
-- The code that represents whether the 'Ceiling and Visibility Okay' (CAVOK) condition has been reported.
-- DOM: A specific domain comprised of the characters in the ASCII character set.
--update parsed_weather_data
--set cig_cavok_code = Null
--where cig_cavok_code = 9;

-- VISIBILITY-OBSERVATION distance dimension
-- The horizontal distance at which an object can be seen and identified.
update parsed_weather_data
set vis_distance = Null
where vis_distance_quality=3 or vis_distance_quality=7  or vis_distance > 160000;

-- VISIBILITY-OBSERVATION variability code
-- The code that denotes whether or not the reported visibility is variable.
-- DOM: A specific domain comprised of the characters in the ASCII character set.
update parsed_weather_data
set vis_var_code = '9'
where vis_var_quality=3 or vis_var_quality=7;

-- temperature 999.9
update parsed_weather_data
set tmp = Null
where TMP_quality='3' or TMP_quality='7' or tmp > 62;

--AIR-TEMPERATURE-OBSERVATION dew point temperature
-- The temperature to which a given parcel of air must be cooled at constant pressure and water vapor
-- content in order for saturation to occur.
-- MIN: -0982 MAX: +0368 UNITS: Degrees Celsius
update parsed_weather_data
set DEW = Null
where DEW_quality='3' or DEW_quality='7' or dew>37;

--ATMOSPHERIC-PRESSURE-OBSERVATION sea level pressure
-- The air pressure relative to Mean Sea Level (MSL).
-- MIN: 08600 MAX: 10900 UNITS: Hectopascals
update parsed_weather_data
set SLP = Null
where SLP_quality='3' or SLP_quality='7' or slp>1100;

## 2.3 Use Forward fill to fill the gaps in all_weather table

In order to get weather data for each flight, we have to have a weather data point available to merge. To get this, we first use forward fill in order to fill in gaps in the weather table. The idea is that if a weather data point is missing, the best available data point is the most recent available, which is achieved with a foward fill.

In [0]:
# 8.34 minutes
# Calculate weather per minute - take average when there are multiple records at same time
spark.sql("drop table if exists weather_merged")
dbutils.fs.rm("/w261/weather_merged", True)

weather_merged = spark.sql("select distinct origin, date \
,(select avg(wnd_direction) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.wnd_dir_quality <> 3 and w2.wnd_dir_quality <> 7 and w2.wnd_direction is not null) as wnd_direction \
,(select min(wnd_type) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.wnd_type is not null)  as wnd_type \
,(select avg(wnd_speed) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.wnd_speed_quality <> 3 and w2.wnd_speed_quality <> 7 and w2.wnd_speed is not null ) as wnd_speed \
,(select avg(cig_height) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.cig_height_quality <> 3 and w2.cig_height_quality <> 7 and w2.cig_height is not null ) as cig_height \
,(select min(cig_code) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.cig_code<>'9' and w2.cig_code is not null) as cig_code \
,(select min(cig_cavok_code) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.cig_cavok_code<>'9' and w2.cig_cavok_code is not null) as cig_cavok_code \
,(select avg(vis_distance) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.vis_distance_quality<>3 and w2.vis_distance_quality<>7 and w2.vis_distance is not null) as vis_distance \
,(select min(vis_var_code) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.vis_var_quality<>3 and w2.vis_var_quality<>7 and w2.vis_var_code is not null) as vis_var_code \
,(select avg(tmp) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.TMP_quality<>'3' and w2.TMP_quality<>'7' and w2.tmp is not null) as tmp \
,(select avg(dew) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.dew_quality<>'3' and w2.DEW_quality<>'7' and w2.dew is not null) as dew \
,(select avg(slp) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date and w2.slp_quality<>'9' and w2.slp is not null) as slp \
,(select avg(elevation) from parsed_weather_data w2 where w2.ORIGIN=w.ORIGIN and w2.date = w.date) as elevation \
from parsed_weather_data w")
#weather_merged.write.mode("overwrite").parquet("/w261/weather_merged")
weather_merged.write.saveAsTable("weather_merged")

In [0]:
# 3.51 minutes
import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import last, first

spark.sql('drop table if exists weather_ffilled')
dbutils.fs.rm('/delta/weather_ffilled',True)

weather_df = spark.sql("select * from weather_merged")
window = Window.partitionBy('ORIGIN')\
               .orderBy('date')\
               .rowsBetween(-sys.maxsize,0)


# define the forward-filled column
wnd_dir_column = last(weather_df['wnd_direction'], ignorenulls=True).over(window)
#wnd_dir_quality_column = last(weather_df['wnd_dir_quality'], ignorenulls=True).over(window)
wnd_type_column = last(weather_df['wnd_type'], ignorenulls=True).over(window)
wnd_speed_column = last(weather_df['wnd_speed'], ignorenulls=True).over(window)
#wnd_speed_quality_column = last(weather_df['wnd_speed_quality'], ignorenulls=True).over(window)
cig_height_column = last(weather_df['cig_height'], ignorenulls=True).over(window)
#cig_height_quality_column = last(weather_df['cig_height_quality'], ignorenulls=True).over(window)
cig_code_column = last(weather_df['cig_code'], ignorenulls=True).over(window)
cig_cavok_code_column = last(weather_df['cig_cavok_code'], ignorenulls=True).over(window)
vis_distance_column = last(weather_df['vis_distance'], ignorenulls=True).over(window)
#vis_distance_quality_column = last(weather_df['vis_distance_quality'], ignorenulls=True).over(window)
vis_var_column = last(weather_df['vis_var_code'], ignorenulls=True).over(window)
#vis_var_quality_column = last(weather_df['vis_var_quality'], ignorenulls=True).over(window)
tmp_column = last(weather_df['tmp'], ignorenulls=True).over(window)
#tmp_quality_column = last(weather_df['tmp_quality'], ignorenulls=True).over(window)
dew_column = last(weather_df['DEW'], ignorenulls=True).over(window)
#dew_quality_column = last(weather_df['dew_quality'], ignorenulls=True).over(window)
slp_column = last(weather_df['SLP'], ignorenulls=True).over(window)
#slp_quality_column = last(weather_df['slp_quality'], ignorenulls=True).over(window)
ele_column = last(weather_df['elevation'], ignorenulls=True).over(window)

# do the fill

weather_df_filled = weather_df\
  .withColumn('wnd_direction', wnd_dir_column)\
  .withColumn('wnd_type', wnd_type_column)\
  .withColumn('wnd_speed', wnd_speed_column)\
  .withColumn('cig_height', cig_height_column)\
  .withColumn('cig_code', cig_code_column)\
  .withColumn('cig_cavok_code', cig_cavok_code_column)\
  .withColumn('vis_distance', vis_distance_column)\
  .withColumn('vis_var_code', vis_var_column)\
  .withColumn('tmp', tmp_column)\
  .withColumn('dew', dew_column)\
  .withColumn('SLP', slp_column)\
  .withColumn('elevation', ele_column).cache()

weather_df_filled.select('ORIGIN', 'date', 'wnd_direction', 'wnd_type','wnd_speed', 'cig_height','cig_code','cig_cavok_code','vis_distance','vis_var_code', 'tmp', 'DEW', 'SLP', 'elevation')\
  .dropDuplicates()\
  .write.format("delta").mode("overwrite").partitionBy("ORIGIN").save("/delta/weather_ffilled/")

#weather_df_filled.write.saveAsTable("weather_ffilled")

spark.sql("CREATE TABLE weather_ffilled USING DELTA LOCATION '/delta/weather_ffilled'")

In [0]:
%sql
--Handle still missing values
--For catogical 
update weather_ffilled
set wnd_type = 9
where wnd_type is null;

update weather_ffilled
set cig_code = 9
where cig_code is null;

update weather_ffilled
set cig_cavok_code = 9 
where cig_cavok_code is null;

update weather_ffilled
set vis_var_code=9
where vis_var_code is null;

--missing one tmp for GUM at 2015-01-01
--update weather_ffilled
--set wnd_direction = 60, wnd_speed = 6.7, tmp=29, dew=23,slp=1010, elevation = 77.4, cig_height=22000
--where origin='GUM' and date < '2015-01-01'

In [0]:
spark.sql('drop table if exists weather_cleaned')
dbutils.fs.rm('/w261/weather_cleaned',True)

weatherDF = spark.sql("select ORIGIN, date, \
ifnull(wnd_direction, avg(wnd_direction) over (partition by ORIGIN order by w.date asc rows between 96 preceding AND 96 following)) as wnd_direction, \
wnd_type, \
ifnull(wnd_speed, avg(wnd_speed) over (partition by ORIGIN order by w.date asc rows between 5 preceding AND 5 following)) as wnd_speed, \
ifnull(cig_height, avg(cig_height) over (partition by ORIGIN order by w.date asc rows between 12 preceding AND 12 following)) as cig_height, \
cig_code, \
cig_cavok_code, \
ifnull(vis_distance, avg(vis_distance) over (partition by ORIGIN order by w.date asc rows between 48 preceding AND 48 following)) as vis_distance, \
vis_var_code, \
ifnull(tmp, avg(tmp) over (partition by ORIGIN order by w.date asc rows between 5 preceding AND 5 following)) as tmp, \
ifnull(dew, avg(dew) over (partition by ORIGIN order by w.date asc rows between 12 preceding AND 12 following)) as dew, \
ifnull(dew, avg(elevation) over (partition by ORIGIN order by w.date asc rows between 12 preceding AND 12 following)) as elevation \
from weather_ffilled w")

weatherDF.write.mode("overwrite").parquet("/w261/weather_cleaned")
weatherDF.write.saveAsTable("weather_cleaned")

## 3. Final Step: Merge Data into one dataset
### Table: flights_all


The final step for generating the flights_all table is to fill any remaining missing values (such as at the beginning of the dataset) using average neighboring values (we also tried Backward Fill, but this attempt failed). We also merge the weather dataset and flight dataset. The principal of the final dataset is to create one united dateset with as many original features as possible, which allows others to create more features during model exploring and training step.

In [0]:
# 2.52
spark.sql("drop table if exists flights_all")
dbutils.fs.rm("/w261/flights_all", True)

# Merge into one dataset
# 6 minutes

all_dataDF = spark.sql("select f.* \
                    , w.wnd_direction, w.wnd_speed, w.cig_height, w.cig_code, w.cig_cavok_code, w.vis_distance, w.vis_var_code, w.tmp, w.dew, w.elevation \
                    , wd.wnd_direction as dest_wnd_direction, wd.wnd_speed as dest_wnd_speed, wd.cig_height as dest_cig_height, wd.cig_code as dest_cig_code \
                    , wd.cig_cavok_code as dest_cig_cavok_code, wd.vis_distance as dest_vis_distance, wd.vis_var_code as dest_vis_var_code \
                    , wd.tmp as dest_tmp, wd.dew as dest_dew, wd.elevation as dest_elevation \
                    from flights_pagerank f join weather_cleaned w on f.ORIGIN=w.ORIGIN and f.report_time_utc = w.date \
                    join weather_cleaned wd on f.DEST=wd.ORIGIN and f.report_time_utc = wd.date")

all_dataDF.dropDuplicates().write.mode("overwrite").parquet("/w261/flights_all")

all_dataDF.write.saveAsTable("flights_all")

#### Flights_all_v2
Update: 2020-11-30 : Adding TAIL_NUM group

We know that different TAIL_NUMs, which represent different flights and their associated paths, have different average values. As a result, it is reasonable to have a feature that classifies flight TAIL_NUM based on their group.

In [0]:
tail_group = spark.sql("with delayCTE as (select tail_num, count(*) as total_flights, avg(ifnull(dep_delay,0)) as avg_dep_delay from flights_all \
where tail_num is not null group by tail_num) \
select tail_num, total_flights, avg_dep_delay, case  when avg_dep_delay < 15 then 1 when avg_dep_delay < 30 then 2 when avg_dep_delay < 45 then 3 else 4 end as tail_delay_group \
from delayCTE \
order by avg_dep_delay").write.saveAsTable("tail_num_group")


#### Flights_all_v3

Adding DEP_DEL15_PREV (one of our most important variables)

We believe an important feature is whether or not the previous flight of a plane was delayed in terms of its departure. We only consider this indicator variable as 1 if the plane's previous delay IF the previous flight on the same plane left at least 2 hours before current departure time (since we want to predict 2 hours ahead of time, for shorter flights, this information will not exist). We get this feature by performing a self-join of the flights_all table based on the TAIL_NUM. In order to get the most recent flight at least 2hrs before with the same TAIL_NUM, we generate a row number partitioned by the current flight considered, and ordered by the departure time of the previous flight with the same TAIL_NUM, and then take the first row. We also restrict origin of the previous flight to be the destination of current flight, which optimizes this query and prevents a full join on the TAIL_NUM.

In [0]:
all_dataDF = all_dataDF.withColumn("mono_index", f.monotonically_increasing_id())

flights_v3 = spark.sql('''SELECT * FROM (SELECT e1.mono_index, e1.departure_time , e1.TAIL_NUM, e1.TAIL_NUM TAIL_NUM_PREV, e1.ORIGIN_CITY_NAME, e1.DEST_CITY_NAME, e2.ORIGIN_CITY_NAME ORIGIN_CITY_NAME_PREV, e2.DEST_CITY_NAME DEST_CITY_NAME_PREV, e2.DEP_DEL15 AS DEP_DEL15_PREV, e2.departure_time AS departure_time_prev, ROW_NUMBER() OVER (PARTITION BY e1.mono_index ORDER BY e2.departure_time DESC) AS rn 
FROM flights_all e1, flights_all e2
WHERE e1.TAIL_NUM = e2.TAIL_NUM 
AND e1.departure_time > e2.departure_time + INTERVAL 2 HOURS
AND e1.ORIGIN_AIRPORT_ID = e2.DEST_AIRPORT_ID) WHERE rn = 1''')

#### Flights_all_v4

We generate a variable OD_group based on the average delay grouped by both ORIGIN and DEST (flights of a particular path). Here, 0 represents less than 5 flights are delayed in 5 years, 1 means the average delay (per flight) is less than 5 minutes, 2 means the average delay is less 10 minutes, and 3 means the average delay is less 15 minutes, and 4 otherwise.

In [0]:
tail_group = spark.sql("with delayCTE as (select ORIGIN, DEST, count(*) as total_flights, avg(ifnull(dep_delay,0)) as avg_dep_delay from flights_all_3 \
group by ORIGIN, DEST having total_flights >= 5) \
select ORIGIN, DEST, total_flights, avg_dep_delay, int(case  when avg_dep_delay < 5 then 1 when avg_dep_delay < 10 then 2 when avg_dep_delay < 15 then 3 else 4 end) as OD_GROUP \
from delayCTE \
order by avg_dep_delay").write.saveAsTable("OD_group")

In [0]:
spark.sql("drop table if exists flights_all_v4")
spark.sql("select f.*, \
          ifnull(o.OD_GROUP, 0) as OD_GROUP \
          from flights_all_3 f left join tail_num_group t on f.TAIL_NUM = t.TAIL_NUM \
          left join OD_group o on f.ORIGIN=o.ORIGIN and f.DEST = o.DEST").write.saveAsTable("flights_all_v4")

#### Final flight table with all features

For the remaining features, we first split the table in train and test sets (90/10 split). This is because these variables will be calculated based on aggregate statistics, and we do not want test data information to leak into the training.

In [0]:
%sql
refresh table flights_all_v4

In [0]:
#Generate view of the data table we have so far
data = spark.sql("SELECT * FROM flights_all_v4 WHERE DEP_DELAY IS NOT NULL and OP_CARRIER NOT IN ('US', 'VX') and ORIGIN NOT IN ('CEC', 'CLD', 'ILG', 'DIK', 'EFD', 'ENV', 'TKI', 'UST', 'FNL', 'YNG', 'IFP', 'FLO') and DEST NOT IN ('CEC', 'CLD', 'ILG', 'DIK', 'EFD', 'ENV', 'TKI', 'UST', 'FNL', 'YNG', 'IFP', 'FLO') ORDER BY FL_DATE, departure_time")
data.registerTempTable('flights_all')
df_total = data.withColumn("mono_index", f.monotonically_increasing_id())


In [0]:
#train test split based on time. First 90% is train
count = df_total.count()
train = df_total.limit(int(0.9*count))
test = df_total.orderBy(f.desc("mono_index")).drop("mono_index").limit(count-int(0.9*count))
train.registerTempTable('flights_train')

The inspiration behind further feature engineering is that there are several variables with a very large number of categories. This is demonstrated below.

In [0]:
#List of variables we have

int_categorical = ['DAY_OF_WEEK', 'DEP_DEL15_PREV', 'MONTH', 'QUARTER', 'DAY_OF_MONTH', 'OP_CARRIER_AIRLINE_ID','OP_CARRIER_FL_NUM', 'CRS_DEP_TIME','DISTANCE_GROUP']#, 'MONTH', 'QUARTER', 'DAY_OF_MONTH']#["OP_CARRIER_AIRLINE_ID", 'ORIGIN_AIRPORT_ID', 'DEST_AIRPORT_ID']

str_features = ['DEST_STATE_ABR', 'wnd_direction', 'dest_wnd_direction','OP_UNIQUE_CARRIER','OP_CARRIER', 'ORIGIN', 'ORIGIN_STATE_ABR', 'DEST', 'cig_code','cig_cavok_code','dest_cig_code','dest_cig_cavok_code','dest_vis_var_code'] #eliminated tail num, 'ORIGIN', 'DEST', 'vis_var_code', 'dest_vis_var_code'

categorical_features = int_categorical + str_features
num_features = ["CRS_DEP_TIME", "DISTANCE", 'vis_distance', 'tmp', 'dew', 'elevation', 'dest_wnd_speed', 'pagerank', 'pagerank_dest', 'wnd_speed', 'cig_height', 'dest_vis_distance', 'dest_tmp', 'dest_dew', 'dest_elevation', 'dest_cig_height','wnd_direction'] 

fe = []
for v in categorical_features:
  fe.append((v, df_total.select(v).distinct().count()))
fe.sort(key = lambda x: -x[1])
fe = [i for i in fe if i[1] > 31]

In [0]:
#categorical variables with more 31 categories (allowing the number of days to be a separate variables)
fe

We see that for example, for the carrier group, there are 7175 distinct flight numbers. Even though the flight number likely plays an important role in determining delays, it is unlikely our model can capture the effect of each number. Instead, one way to bin different carriers into the same bin is to determine the average flight delay of the flight number based on the training set. For each variable with more than 31 categories (so that we don't need to bin days of the month), we first generate a percent rank for the mean response based on each group of the variable. For example, with OP_CARRIER_FL_NUM, for each distinct number, we calculate the mean fraction of flights delayed. We then calculate a percent rank for this mean response, and we then bin this percent rank into 10 groups (10 indicators for our models to use). This way, we greatly reduce the number of indicators we need, with each indicator representing a different mean level of delays, giving our model a much better chance at making use of these 10 groups (rather than a giant number of groups, which could lead to overfitting and too much flexibility).

In [0]:
pct_table_dep_time = spark.sql('''SELECT CRS_DEP_TIME,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS CRS_DEP_TIME_bucket
FROM 
(SELECT CRS_DEP_TIME, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT CRS_DEP_TIME, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY CRS_DEP_TIME) t1)''')

pct_table_carrier = spark.sql('''SELECT OP_CARRIER_FL_NUM,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS OP_CARRIER_FL_NUM_bucket
FROM 
(SELECT OP_CARRIER_FL_NUM, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT OP_CARRIER_FL_NUM, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY OP_CARRIER_FL_NUM) t1)''')

pct_table_origin = spark.sql('''SELECT ORIGIN,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS ORIGIN_bucket
FROM 
(SELECT ORIGIN, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT ORIGIN, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY ORIGIN) t1)''')

pct_table_dest = spark.sql('''SELECT DEST,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS DEST_bucket
FROM 
(SELECT DEST, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT DEST, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY DEST) t1)''')

pct_table_origin_abr = spark.sql('''SELECT ORIGIN_STATE_ABR,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS ORIGIN_STATE_ABR_bucket
FROM 
(SELECT ORIGIN_STATE_ABR, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT ORIGIN_STATE_ABR, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY ORIGIN_STATE_ABR) t1)''')

pct_table_dest_abr = spark.sql('''SELECT DEST_STATE_ABR,
CASE
    WHEN pct_rank < 0.10 THEN 0
    WHEN pct_rank < 0.20 THEN 1
    WHEN pct_rank < 0.30 THEN 2
    WHEN pct_rank < 0.40 THEN 3
    WHEN pct_rank < 0.50 THEN 4
    WHEN pct_rank < 0.60 THEN 5
    WHEN pct_rank < 0.70 THEN 6
    WHEN pct_rank < 0.80 THEN 7
    WHEN pct_rank < 0.90 THEN 8
    ELSE 9
END AS DEST_STATE_ABR_bucket
FROM 
(SELECT DEST_STATE_ABR, PERCENT_RANK() OVER (ORDER BY avg_del) AS pct_rank FROM
(SELECT DEST_STATE_ABR, AVG(DEP_DEL15) avg_del FROM flights_train
GROUP BY DEST_STATE_ABR) t1)''')

In [0]:
#Join to our flights table (join to the entire table, even though the bins are calculated only using the training set)
df_full = df_total.join(pct_table_dep_time, 'CRS_DEP_TIME')\
        .join(pct_table_carrier, 'OP_CARRIER_FL_NUM')\
        .join(pct_table_origin, 'ORIGIN')\
        .join(pct_table_dest, 'DEST')\
        .join(pct_table_origin_abr, 'ORIGIN_STATE_ABR')\
        .join(pct_table_dest_abr, 'DEST_STATE_ABR')

In [0]:
df_full.orderBy('FL_DATE', 'CRS_DEP_TIME').drop('mono_index').write.saveAsTable("flights_all_v5")

In [0]:
df_full.count()

In [0]:
display(df_full)

DEST_STATE_ABR,ORIGIN_STATE_ABR,DEST,ORIGIN,OP_CARRIER_FL_NUM,CRS_DEP_TIME,YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN_CITY_NAME,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST_CITY_NAME,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,FIRST_DEP_TIME,TOTAL_ADD_GTIME,LONGEST_ADD_GTIME,departure_time,report_time,report_time_utc,arr_time_dest,dep_delay_last_hour,arr_delay_last_hour,dep_delay_ratio,arr_delay_ratio,pagerank,pagerank_dest,wnd_direction,wnd_speed,cig_height,cig_code,cig_cavok_code,vis_distance,vis_var_code,tmp,dew,elevation,dest_wnd_direction,dest_wnd_speed,dest_cig_height,dest_cig_code,dest_cig_cavok_code,dest_vis_distance,dest_vis_var_code,dest_tmp,dest_dew,dest_elevation,tail_delay_group,mono_index,DEP_DEL15_PREV,OD_GROUP,CRS_DEP_TIME_bucket,OP_CARRIER_FL_NUM_bucket,ORIGIN_bucket,DEST_bucket,ORIGIN_STATE_ABR_bucket,DEST_STATE_ABR_bucket
TX,AL,IAH,HSV,4569,1238,2015,1,1,1,4,2015-01-01,EV,20366,EV,N13538,12217,1221702,30255,"Huntsville, AL",1,Alabama,51,12266,1226603,31453,"Houston, TX",48,Texas,74,1228,-10.0,0.0,0.0,-1,1200-1259,9.0,1237,1425,13.0,1440,1438,-2.0,0.0,0.0,-1,1400-1459,0.0,,0.0,122.0,130.0,108.0,1.0,595.0,3,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T12:38:00.000+0000,2015-01-01T10:38:00.000+0000,2015-01-01T16:38:00.000+0000,2015-01-01T14:40:00.000+0000,0.0,0.3888888888888889,0.0,2.1534692531198263,0.4155520746083281,8.3416648140738,180.0,1.5,2134.0,M,N,16093.0,N,5.0,-2.0,-2.0,50.0,4.1,457.5,M,N,15288.5,N,6.0,4.0,4.0,1,5022,0.0,2,3,0,2,2,3,3
SC,VA,CAE,IAD,5731,1238,2015,1,1,1,4,2015-01-01,EV,20366,EV,N832AS,12264,1226402,30852,"Washington, DC",51,Virginia,38,10868,1086803,30868,"Columbia, SC",45,South Carolina,37,1228,-10.0,0.0,0.0,-1,1200-1259,17.0,1245,1350,4.0,1409,1354,-15.0,0.0,0.0,-1,1400-1459,0.0,,0.0,91.0,86.0,65.0,1.0,401.0,2,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T12:38:00.000+0000,2015-01-01T10:38:00.000+0000,2015-01-01T15:38:00.000+0000,2015-01-01T14:09:00.000+0000,0.4,0.0,2.5100993124522537,0.0,2.554812130596599,0.3855823218207974,190.0,1.5,22000.0,9,N,16093.0,N,1.0,-6.0,-6.0,200.0,0.0,22000.0,W,N,6437.0,N,3.3,1.7,1.7,1,5023,0.0,4,3,7,5,6,5,3
MS,TX,JAN,IAH,4698,1342,2015,1,1,1,4,2015-01-01,EV,20366,EV,N11548,12266,1226603,31453,"Houston, TX",48,Texas,74,12448,1244805,32448,"Jackson/Vicksburg, MS",28,Mississippi,53,1345,3.0,3.0,0.0,0,1300-1359,11.0,1356,1450,7.0,1458,1457,-1.0,0.0,0.0,-1,1400-1459,0.0,,0.0,76.0,72.0,54.0,1.0,351.0,2,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T17:42:00.000+0000,2015-01-01T14:58:00.000+0000,0.16,0.5,0.925406880999362,3.1211470203624136,8.3416648140738,0.4389091461186215,50.0,4.6,366.0,M,N,16093.0,N,6.0,4.0,4.0,40.0,2.1,3048.0,M,N,16093.0,N,7.2,1.7,1.7,1,5929,0.0,1,4,0,7,3,8,2
MD,TX,BWI,DFW,202,1342,2015,1,1,1,4,2015-01-01,NK,20416,NK,N618NK,11298,1129803,30194,"Dallas/Fort Worth, TX",48,Texas,74,10821,1082103,30852,"Baltimore, MD",24,Maryland,35,1337,-5.0,0.0,0.0,-1,1300-1359,11.0,1348,1700,5.0,1724,1705,-19.0,0.0,0.0,-2,1700-1759,0.0,,0.0,162.0,148.0,132.0,1.0,1217.0,5,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T17:42:00.000+0000,2015-01-01T17:24:00.000+0000,0.1224489795918367,0.0666666666666666,0.6001480356048641,0.3610090804312935,15.031239575590972,4.467910853672466,10.0,1.5,167.5,M,N,4828.0,N,1.15,-0.3,-0.3,240.0,3.6,22000.0,9,N,16093.0,9,5.0,-9.0,-9.0,1,5930,0.0,3,4,5,8,5,8,4
NJ,NC,EWR,RDU,4224,1342,2015,1,1,1,4,2015-01-01,EV,20366,EV,N11113,14492,1449202,34492,"Raleigh/Durham, NC",37,North Carolina,36,11618,1161802,31703,"Newark, NJ",34,New Jersey,21,1342,0.0,0.0,0.0,0,1300-1359,18.0,1400,1510,8.0,1526,1518,-8.0,0.0,0.0,-1,1500-1559,0.0,,0.0,104.0,96.0,70.0,1.0,416.0,2,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T16:42:00.000+0000,2015-01-01T15:26:00.000+0000,0.0,0.0,0.0,0.0,2.0696208873013333,5.65463663591611,230.0,1.5,22000.0,9,N,16093.0,N,5.6,-6.1,-6.1,270.0,5.7,22000.0,M,N,16093.0,9,3.3,-11.9,-11.9,1,5931,0.0,4,4,8,7,9,7,9
MA,FL,BOS,FLL,2070,1342,2015,1,1,1,4,2015-01-01,B6,20409,B6,N715JB,11697,1169703,32467,"Fort Lauderdale, FL",12,Florida,33,10721,1072102,30721,"Boston, MA",25,Massachusetts,13,1402,20.0,20.0,1.0,1,1300-1359,16.0,1418,1702,6.0,1650,1708,18.0,18.0,1.0,1,1600-1659,0.0,,0.0,188.0,186.0,164.0,1.0,1237.0,5,7.0,0.0,0.0,0.0,11.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T16:42:00.000+0000,2015-01-01T16:50:00.000+0000,0.0555555555555555,0.1818181818181818,0.2571764082884593,0.8860177662398709,4.272578368362653,5.725204318007497,85.0,0.92,2109.2,M,N,16093.0,N,25.22,20.12,20.12,230.0,7.699999999999999,22000.0,M,N,16093.0,9,-1.95,-15.3,-15.3,2,5932,1.0,4,4,7,8,9,8,9
FL,NJ,TPA,ACY,341,1342,2015,1,1,1,4,2015-01-01,NK,20416,NK,N617NK,10158,1015804,30158,"Atlantic City, NJ",34,New Jersey,21,15304,1530402,33195,"Tampa, FL",12,Florida,33,1332,-10.0,0.0,0.0,-1,1300-1359,8.0,1340,1604,5.0,1617,1609,-8.0,0.0,0.0,-1,1600-1659,0.0,,0.0,155.0,157.0,144.0,1.0,913.0,4,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T16:42:00.000+0000,2015-01-01T16:17:00.000+0000,0.0,0.0909090909090909,0.0,0.528257321426202,0.3016272089564747,3.2221247579335213,250.0,4.6,22000.0,9,N,16093.0,N,3.9,-11.1,-11.1,45.0,2.85,335.0,M,N,16093.0,N,19.5,15.5,15.5,1,5933,0.0,1,4,6,4,6,9,7
IL,GA,ORD,ATL,1612,1342,2015,1,1,1,4,2015-01-01,DL,19790,DL,N909DA,10397,1039705,30397,"Atlanta, GA",13,Georgia,34,13930,1393003,30977,"Chicago, IL",17,Illinois,41,1339,-3.0,0.0,0.0,-1,1300-1359,17.0,1356,1431,9.0,1450,1440,-10.0,0.0,0.0,-1,1400-1459,0.0,,0.0,128.0,121.0,95.0,1.0,606.0,3,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T16:42:00.000+0000,2015-01-01T14:50:00.000+0000,0.0943396226415094,0.14,0.5512540560267385,0.6115872492330685,20.47173330246676,17.370717321781793,200.0,1.5,22000.0,M,N,16093.0,N,6.7,-1.7,-1.7,230.0,9.3,7620.0,M,N,16093.0,N,-5.6,-13.3,-13.3,1,5934,0.0,3,4,7,7,5,6,4
MN,CO,MSP,DEN,6112,1342,2015,1,1,1,4,2015-01-01,EV,20366,EV,N16147,11292,1129202,30325,"Denver, CO",8,Colorado,82,13487,1348702,31650,"Minneapolis, MN",27,Minnesota,63,1343,1.0,1.0,0.0,0,1300-1359,18.0,1401,1633,11.0,1646,1644,-2.0,0.0,0.0,-1,1600-1659,0.0,,0.0,124.0,121.0,92.0,1.0,680.0,3,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T18:42:00.000+0000,2015-01-01T16:46:00.000+0000,0.5172413793103449,0.0625,2.599371850505236,0.4002139045910787,13.664515890907916,8.76777110509038,300.0,5.7,22000.0,M,N,16093.0,9,-4.0,-12.0,-12.0,280.0,4.1,22000.0,M,N,16093.0,N,-2.2,-7.2,-7.2,1,5935,0.0,2,4,6,8,2,8,0
AR,GA,LIT,ATL,1861,1342,2015,1,1,1,4,2015-01-01,DL,19790,DL,N925AT,10397,1039705,30397,"Atlanta, GA",13,Georgia,34,12992,1299204,32600,"Little Rock, AR",5,Arkansas,71,1339,-3.0,0.0,0.0,-1,1300-1359,12.0,1351,1400,5.0,1424,1405,-19.0,0.0,0.0,-2,1400-1459,0.0,,0.0,102.0,86.0,69.0,1.0,453.0,2,0.0,0.0,0.0,0.0,0.0,0,0.0,0.0,2015-01-01T13:42:00.000+0000,2015-01-01T11:42:00.000+0000,2015-01-01T16:42:00.000+0000,2015-01-01T14:24:00.000+0000,0.0943396226415094,0.0,0.5512540560267385,0.0,20.47173330246676,0.6254026187064541,200.0,1.5,22000.0,M,N,16093.0,N,6.7,-1.7,-1.7,160.0,2.6,1311.0,M,N,16093.0,9,3.0,-3.0,-3.0,1,5936,0.0,2,4,2,7,5,6,5


###Summary of all variables

Below is a summary of all of the variables we consider in our models. Description for engineered variables are given.

| Variable      | Description (if needed)|
| ----------- | ----------- |
| DAY_OF_WEEK      |       |
| DEP_DEL15_PREV   | Whether previous flight on the same plane was delayed|
| MONTH      |       |
| QUARTER   | |
| DAY_OF_WEEK      |       |
| OD_GROUP   | Average delay group for particular flight path |
| OP_CARRIER_FL_NUM_bucket      | bucket approach (previous section) for flight number|
| CRS_DEP_TIME_bucket   | bucket approach (previous section) for departure time|
| ORIGIN_bucket      | bucket approach (previous section) for origin      |
| DEST_bucket   | bucket approach (previous section) for destination|
| ORIGIN_STATE_ABR_bucket      | bucket approach (previous section) for origin state      |
| DEST_STATE_ABR_bucket   | bucket approach (previous section) for destination state|
| DISTANCE      | distance to destination       |
| vis_distance   | first field of visibility representing distance|
| dest_vis_distance      |  |
| tmp      | temperature of origin    |
| dest_tmp   ||
| dew   | dew point of origin|
| dest_dew      |   |
| elevation      | elevation of origin |
| dest_elevation   | |
| wnd_speed      | wind speed of origin   |
| dest_wnd_speed   | |
| pagerank      | pagerank score of origin airport |
| pagerank_dest   | |
| cig_height   | cloud height ceiling of origin|
| dest_cig_height      |   |
| cig_code      | cloud height ceiling of origin classification by NOAA      |
| dest_cig_code      |   |
| cig_cavok_code   | |
| dest_cig_cavok_code   | |
| vis_var_code      | visibility of origin classification by NOAA   |
| dest_vis_var_code   | |