##Objective

This notebook is used to load data from parquet files in the shared mounted storage, clean up the data and ensure the columns are such that would result in a more seemless join, then create the necessary tables join the dastasets. The result will be a fully joined dataset that is checkpointed back as a parquet file on Azure Blob storage.

##Import Packages

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.functions import col,isnan,when,count,lit, to_date,lpad,date_format,rpad,regexp_replace,concat,to_utc_timestamp,to_timestamp, countDistinct
from pyspark.sql.types import IntegerType,BooleanType,DateType,StringType,TimestampType
from pyspark.sql import DataFrameNaFunctions
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from pytz import timezone
import datetime
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

In [0]:
#Show files in w261 folder
display(dbutils.fs.ls("/mnt/mids-w261/datasets_final_project/"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/airlines/,airlines/,0
dbfs:/mnt/mids-w261/datasets_final_project/airlines_data/,airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/,parquet_airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_3m/,parquet_airlines_data_3m/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_6m/,parquet_airlines_data_6m/,0
dbfs:/mnt/mids-w261/datasets_final_project/stations_data/,stations_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data/,weather_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_6_hr/,weather_data_6_hr/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_single/,weather_data_single/,0


##Cloud Storage initialization
---
>START>*Copy the following code blocks below into each notebook to access the blob storage*

In [0]:

blob_container = "tm30container" # The name of your container created in https://portal.azure.com
storage_account = "w261tm30" # The name of your Storage account created in https://portal.azure.com
secret_scope = "w261tm30" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "tm30key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
#Show files within blob storage and choose the delta lake folder of interest
display(dbutils.fs.ls(f"{blob_url}"))

# Configure Path
DATA_PATH = f"{blob_url}/2022-03-24_data_chkpt_PQ_full"

path,name,size
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-18_data_chkpt/,2022-03-18_data_chkpt/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-19_data_chkpt_6m/,2022-03-19_data_chkpt_6m/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-23_data_chkpt_DL_6m/,2022-03-23_data_chkpt_DL_6m/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-23_data_chkpt_PQ_6m/,2022-03-23_data_chkpt_PQ_6m/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-24_data_chkpt_PQ_6m/,2022-03-24_data_chkpt_PQ_6m/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-24_data_chkpt_PQ_full/,2022-03-24_data_chkpt_PQ_full/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-27_data_chkpt_PQ_full/,2022-03-27_data_chkpt_PQ_full/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-04-07_data_chkpt_PQ_6m_test/,2022-04-07_data_chkpt_PQ_6m_test/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/data_final/,data_final/,0
wasbs://tm30container@w261tm30.blob.core.windows.net/graph_test/,graph_test/,0


##Define Functions

In [0]:
def data_pull(df, time_window = 'full', date_col='FLIGHT_UTC_DATE'):
  """Pull processed dataset"""
  
#   df = spark.read.format("delta").load(DELTALAKE_DATA_PATH)
  
  if time_window == '2019':
    df = df.filter(f.year(col(date_col)) == 2019)
  elif time_window == '2018':
    df = df.filter(f.year(col(date_col)) == 2018)
  elif time_window == '2017':
    df = df.filter(f.year(col(date_col)) == 2017)
  elif time_window == '2016':
    df = df.filter(f.year(col(date_col)) == 2016) 
  
  #The commands below are for 2015 data
  elif time_window == '6m':
    df = df.filter(col(date_col) < "2015-07-01T00:00:00.000")  
  elif time_window == '3m':
    df = df.filter(col(date_col) < "2015-04-01T00:00:00.000")
  
  #comment this out if it takes too long
  print(f'{df.count():,} total records imported for the {time_window} dataset')
  return df

In [0]:
def FlightPreprocessing(df_airlines):
    '''Perform EDA and Preprocessing on flight data
    '''
    df2 = df_airlines.select([(f.count(f.when(f.isnan(c) | f.col(c).isNull(), c))/f.count(f.lit(1))).alias(c)
                        for c in df_airlines.columns]).collect()

    #identify columns with more than 50% nulls
    remove_columns = [c for c in df_airlines.columns if df2[0].__getitem__(c) > .5]

    #remove all columns that have 50% or more nulls and or cannot be acquired 2 hours before departure
    remove_unncessary_columns = ['DEP_DELAY', 'DEP_DELAY_GROUP', 'TAXI_OUT', 'WHEELS_OFF', 'FLIGHTS', 'OP_CARRIER', 'OP_CARRIER_AIRLINE_ID', 'CANCELLED', 'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'TAXI_IN', 'ARR_DELAY_GROUP', 'ORIGIN_AIRPORT_SEQ_ID', 'DIVERTED', 'AIR_TIME', 'DISTANCE_GROUP', 'DISTANCE', 'DEST_AIRPORT_SEQ_ID',  'ORIGIN_CITY_MARKET_ID', 'FLIGHTS', 'ARR_DEL15', 'ARR_DEL_NEW','QUARTER','DIV_AIRPORT_LANDINGS', 'WHEELS_ON', 'ACTUAL_ELAPSED_TIME', 'YEAR', 'DAY_OF_MONTH', 'DEST_CITY_MARKET_ID', 'ORIGIN_AIRPORT_ID', 'ORIGIN_STATE_FIPS','ORIGIN_WAC', 'DEST_AIRPORT_ID', 'ORIGIN_AIRPORT_ID', 'ORIGIN_STATE_FIPS', 'DES_WAC', 'DEST_STATE_FIPS', 'DEST_STATE_NM','ORIGIN_CITY_NAME', 'ORIGIN_STATE_ABR', 'ORIGIN_STATE_NM','DEST_CITY_NAME', 'DEST_STATE_ABR', 'DEST_STATE_NM','DEST_WAC', 'DEP_TIME']
    final_df_airlines = df_airlines.drop(*remove_columns).drop(*remove_unncessary_columns)

    #Create Time of Day Field
    final_df_airlines = final_df_airlines.withColumn('TIME_OF_DAY', f.when(final_df_airlines.CRS_DEP_TIME.between(500,1159), 'Morning')\
                                                                    .when(final_df_airlines.CRS_DEP_TIME.between(1200,1659), 'Afternoon')\
                                                                    .when(final_df_airlines.CRS_DEP_TIME.between(1700,2259), 'Evening')\
                                                                    .otherwise('Night'))


    #Convert Integers Into Strings and Format Date
    final_df_airlines = final_df_airlines.withColumn("OP_CARRIER_FL_NUM",f.col("OP_CARRIER_FL_NUM").cast(StringType())) \
                                         .withColumn("CRS_DEP_TIME",f.col("CRS_DEP_TIME").cast(StringType())) \
                                         .withColumn("FL_DATE", f.date_format("FL_DATE", "yyyy-dd-MM"))

    #Pad missing 0 and and convert into timestamp, create unique id to drop duplicates, convert columns from strings to integers
    final_df_airlines = final_df_airlines.withColumn('CRS_DEP_TIME', f.lpad(final_df_airlines.CRS_DEP_TIME,4, '0')) \
                                         .withColumn('DATE_TIME', f.concat(f.col('FL_DATE'),f.lit(" "),f.col('CRS_DEP_TIME'))) \
                                         .withColumn("DATE_TIME", f.to_timestamp("DATE_TIME", "yyyy-dd-MM HHmm")) \
                                         .withColumn('UNIQUE_ID', f.concat(f.col('OP_UNIQUE_CARRIER'),f.lit("-"),f.col('OP_CARRIER_FL_NUM'),f.lit("-"),f.col('DATE_TIME'))) \
                                         .dropDuplicates((['UNIQUE_ID'])) \
                                         .withColumn('DEP_DEL15',f.col('DEP_DEL15').cast(IntegerType())) \
                                         .withColumn('DEP_DELAY_NEW',f.col('DEP_DELAY_NEW').cast(IntegerType())) \
                                         .withColumn('CRS_ELAPSED_TIME',f.col('CRS_ELAPSED_TIME').cast(IntegerType())) \
                                         .withColumn('ARR_DELAY_NEW',f.col('ARR_DELAY_NEW').cast(IntegerType())) \
                                         .withColumn('FLIGHT_ROUTE', f.concat(f.col('ORIGIN'),f.lit("-"),f.col('DEST')))

    #Convert day of week from number to day
    final_df_airlines = final_df_airlines.withColumn("DAY_OF_WEEK", f.when(final_df_airlines.DAY_OF_WEEK == "1","MONDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "2","TUESDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "3","WEDNESDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "4","THURSDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "5","FRIDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "6","SATURDAY") \
                                                                  .when(final_df_airlines.DAY_OF_WEEK == "7","SUNDAY"))

    #convert integers to strings
    final_df_airlines = final_df_airlines.withColumn("MONTH", f.when(final_df_airlines.MONTH == "1","JAN") \
                                                              .when(final_df_airlines.MONTH == "2","FEB") \
                                                              .when(final_df_airlines.MONTH == "3","MAR") \
                                                              .when(final_df_airlines.MONTH == "4","APR") \
                                                              .when(final_df_airlines.MONTH == "5","MAY") \
                                                              .when(final_df_airlines.MONTH == "6","JUNE") \
                                                              .when(final_df_airlines.MONTH == "7","JULY") \
                                                              .when(final_df_airlines.MONTH == "8","AUG") \
                                                              .when(final_df_airlines.MONTH == "9","SEPT") \
                                                              .when(final_df_airlines.MONTH == "10","OCT") \
                                                              .when(final_df_airlines.MONTH == "11","NOV") \
                                                              .when(final_df_airlines.MONTH == "12","DEC"))

    #add airport timezone and convert to UTC time to join weather data
    aptz = spark.table("aptz_csv")
    final_df_airlines = final_df_airlines.join(aptz, final_df_airlines.ORIGIN == aptz.AIRPORT, 'left').select("*")
    final_df_airlines = final_df_airlines.select('*', f.to_utc_timestamp(final_df_airlines.DATE_TIME, final_df_airlines.TIMEZONE).alias('UTC_TIMESTAMP'))

    #remove all rows that don't have a target value
    final_drop = ['FL_DATE','OP_CARRIER_FL_NUM', 'CRS_DEP_TIME', 'AIRPORT']
    final_df_airlines = final_df_airlines.drop(*final_drop).dropna()
    return final_df_airlines

In [0]:
def WeatherPreprocessing(df_weather):

  red_cols = [
  'STATION',
  'DATE',
  'SOURCE',
  'LATITUDE',
  'LONGITUDE',
  'ELEVATION',
  'NAME',
  'REPORT_TYPE',
  'CALL_SIGN',
  'WND',
  'CIG',
  'VIS',
  'TMP',
  'DEW',
  'SLP',
  'GA1', # SKY COVER
  'AA1', # RAIN
  'AJ1', # SNOW
  'KA1', # EXTREME TEMP
  'AT1',
  'AX1'
  ]

  qual_sus = ['2','6']
  qual_err = ['3','7']

  filtered_weather = df_weather.select([when(col(c)=="", None).otherwise(col(c)).alias(c) for c in df_weather.columns])
  filtered_weather = filtered_weather.withColumn("wnd_dir_angle", when(f.split(col('WND'), ',').getItem(0) == "999", "").otherwise(f.split(col('WND'), ',').getItem(0))) \
                                  .withColumn("wnd_dir_qual", f.split(col('WND'), ',').getItem(1)) \
                                  .withColumn("wnd_type",  when(f.split(col('WND'), ',').getItem(2) == "9", "").otherwise(f.split(col('WND'), ',').getItem(2))) \
                                  .withColumn("wnd_spd_rate", when(f.split(col('WND'), ',').getItem(3) == "9999", 0).otherwise(f.split(col('WND'), ',').getItem(3).cast(IntegerType()))) \
                                  .withColumn("wnd_spd_qual", f.split(col('WND'), ',').getItem(4)) \
                                  .withColumn("wnd_ex", when(col("WND") == "", 0).otherwise(1)) \
                                  .withColumn("wnd_dir_is_qual", when(f.split(col('WND'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("wnd_spd_is_qual", when(f.split(col('WND'), ',').getItem(4).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("cig_ceil_ht", when(f.split(col('CIG'), ',').getItem(0) == "9999", 0).otherwise(f.split(col('CIG'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("cig_ceil_qual", f.split(col('CIG'), ',').getItem(1)) \
                                  .withColumn("cig_ceil_det", when(f.split(col('CIG'), ',').getItem(2) == "9", "").otherwise(f.split(col('CIG'), ',').getItem(2))) \
                                  .withColumn("cig_cavok", when(f.split(col('CIG'), ',').getItem(3) == "9", "").otherwise(f.split(col('CIG'), ',').getItem(3))) \
                                  .withColumn("cig_ex", when(col("CIG") == "", 0).otherwise(1)) \
                                  .withColumn("cig_cavok_bool", when(f.split(col('CIG'), ',').getItem(3) == "9", "").when(f.split(col('CIG'), ',').getItem(1) == 'N', 0).otherwise(1)) \
                                  .withColumn("cig_ceil_is_qual", when(f.split(col('CIG'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("vis_dist", when(f.split(col('VIS'), ',').getItem(0) == "999999", 0).otherwise(f.split(col('VIS'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("vis_dist_qual", f.split(col('VIS'), ',').getItem(1)) \
                                  .withColumn("vis_dist_var", when(f.split(col('VIS'), ',').getItem(2) == "9", "").otherwise(f.split(col('VIS'), ',').getItem(2))) \
                                  .withColumn("vis_dist_qual_var", f.split(col('VIS'), ',').getItem(3)) \
                                  .withColumn("vis_ex", when(col("VIS") == "", 0).otherwise(1)) \
                                  .withColumn("vis_dist_var_bool", when(f.split(col('VIS'), ',').getItem(1) == 'N', 0).otherwise(1)) \
                                  .withColumn("vis_dist_is_qual", when(f.split(col('VIS'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("vis_dist_is_qual_var", when(f.split(col('VIS'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("tmp_air", when(f.split(col('TMP'), ',').getItem(0) == "+9999",0).otherwise(f.split(col('TMP'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("tmp_air_qual", f.split(col('TMP'), ',').getItem(1)) \
                                  .withColumn("tmp_ex", when(col("TMP") == "", 0).otherwise(1)) \
                                  .withColumn("tmp_air_is_qual", when(f.split(col('TMP'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("dew_pnt_tmp", when(f.split(col('DEW'), ',').getItem(0) == "+9999",0).otherwise(f.split(col('DEW'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("dew_pnt_qual", f.split(col('DEW'), ',').getItem(1)) \
                                  .withColumn("dew_ex", when(col("DEW") == "", 0).otherwise(1)) \
                                  .withColumn("dew_pnt_is_qual", when(f.split(col('DEW'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("slp_prs", when(f.split(col('SLP'), ',').getItem(0) == "99999",0).otherwise(f.split(col('SLP'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("slp_prs_qual", f.split(col('SLP'), ',').getItem(1)) \
                                  .withColumn("slp_ex", when(col("SLP") == "", 0).otherwise(1)) \
                                  .withColumn("slp_prs_is_qual", when(f.split(col('SLP'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("aa1_prd_quant_hr", when(f.split(col('AA1'), ',').getItem(0) == "99",0).otherwise(f.split(col('AA1'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("aa1_dp", when(f.split(col('AA1'), ',').getItem(1) == "9999",0).otherwise(f.split(col('AA1'), ',').getItem(1).cast(IntegerType()))) \
                                  .withColumn("aa1_cond", when(f.split(col('AA1'), ',').getItem(2) == "9", "").otherwise(f.split(col('AA1'), ',').getItem(2))) \
                                  .withColumn("aa1_qual", f.split(col('AA1'), ',').getItem(3)) \
                                  .withColumn("aa1_ex", when(col("AA1") == "", 0).otherwise(1)) \
                                  .withColumn("aa1_is_qual", when(f.split(col('AA1'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("aj1_dim", when(f.split(col('AJ1'), ',').getItem(0) == "9999",0).otherwise(f.split(col('AJ1'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("aj1_cond", when(f.split(col('AJ1'), ',').getItem(1) == "9", "").otherwise(f.split(col('AJ1'), ',').getItem(1))) \
                                  .withColumn("aj1_qual", f.split(col('AJ1'), ',').getItem(2)) \
                                  .withColumn("aj1_eq_wtr_dp", when(f.split(col('AJ1'), ',').getItem(3) == "999999",0).otherwise(f.split(col('AJ1'), ',').getItem(3).cast(IntegerType()))) \
                                  .withColumn("aj1_eq_wtr_cond", when(f.split(col('AJ1'), ',').getItem(4) == "9", "").otherwise(f.split(col('AJ1'), ',').getItem(4))) \
                                  .withColumn("aj1_eq_wtr_cond_qual", f.split(col('AJ1'), ',').getItem(5)) \
                                  .withColumn("aj1_ex", when(col("AJ1") == "", 0).otherwise(1)) \
                                  .withColumn("aj1_is_qual", when(f.split(col('AJ1'), ',').getItem(2).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("aj1_eq_wtr_cond_is_qual", when(f.split(col('AJ1'), ',').getItem(5).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ga1_cov", when(f.split(col('GA1'), ',').getItem(0) == "99", "").otherwise(f.split(col('GA1'), ',').getItem(0))) \
                                  .withColumn("ga1_cov_qual", f.split(col('GA1'), ',').getItem(1)) \
                                  .withColumn("ga1_bs_ht", when(f.split(col('GA1'), ',').getItem(2) == "+9999",0).otherwise(f.split(col('GA1'), ',').getItem(2).cast(IntegerType()))) \
                                  .withColumn("ga1_bs_ht_qual", f.split(col('GA1'), ',').getItem(3)) \
                                  .withColumn("ga1_cld", when(f.split(col('GA1'), ',').getItem(4) == "99", "").otherwise(f.split(col('GA1'), ',').getItem(4))) \
                                  .withColumn("ga1_cld_qual", f.split(col('GA1'), ',').getItem(5)) \
                                  .withColumn("ga1_ex", when(col("GA1") == "", 0).otherwise(1)) \
                                  .withColumn("ga1_cov_is_qual", when(f.split(col('GA1'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ga1_bs_ht_is_qual", when(f.split(col('GA1'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ga1_cld_qual", when(f.split(col('GA1'), ',').getItem(5).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ka1_prd_quant", when(f.split(col('KA1'), ',').getItem(0) == "999",0).otherwise(f.split(col('KA1'), ',').getItem(0).cast(IntegerType()))) \
                                  .withColumn("ka1_code", when(f.split(col('KA1'), ',').getItem(1) == "9", "").otherwise(f.split(col('KA1'), ',').getItem(1))) \
                                  .withColumn("ka1_temp", when(f.split(col('KA1'), ',').getItem(2) == "+9999",0).otherwise(f.split(col('KA1'), ',').getItem(2).cast(IntegerType()))) \
                                  .withColumn("ka1_temp_qual", f.split(col('KA1'), ',').getItem(3)) \
                                  .withColumn("ka1_ex", when(col("KA1") == "", 0).otherwise(1)) \
                                  .withColumn("ka1_temp_is_qual", when(f.split(col('KA1'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("at1_src_elem", f.split(col('AT1'), ',').getItem(0)) \
                                  .withColumn("at1_wthr", f.split(col('AT1'), ',').getItem(1)) \
                                  .withColumn("at1_wthr_abrv", f.split(col('AT1'), ',').getItem(2)) \
                                  .withColumn("at1_qual", f.split(col('AT1'), ',').getItem(3)) \
                                  .withColumn("at1_ex", when(col("AT1") == "", 0).otherwise(1)) \
                                  .withColumn("at1_is_qual", when(f.split(col('AT1'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ax1_atm", f.split(col('AX1'), ',').getItem(0)) \
                                  .withColumn("ax1_qual", f.split(col('AX1'), ',').getItem(1)) \
                                  .withColumn("ax1_prd_quant", when(f.split(col('AX1'), ',').getItem(2) == "99",0).otherwise(f.split(col('AX1'), ',').getItem(2).cast(IntegerType()))) \
                                  .withColumn("ax1_prd_qual", f.split(col('AX1'), ',').getItem(3)) \
                                  .withColumn("ax1_ex", when(col("AX1") == "", 0).otherwise(1)) \
                                  .withColumn("ax1_is_qual", when(f.split(col('AX1'), ',').getItem(1).isin(qual_err+qual_sus), 0).otherwise(1)) \
                                  .withColumn("ax1_prd_is_qual", when(f.split(col('AT1'), ',').getItem(3).isin(qual_err+qual_sus), 0).otherwise(1)) 
  
  
  return filtered_weather

In [0]:
def data_load():
  # Load airport timezone table
  aptz = spark.table("aptz_csv")
  
  # Load US only timezone from <https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>
  ustz = spark.table("ustimezones_csv").select("TIMEZONE")
  
  # Load all flight data
  df_airlines = spark.read.parquet("/mnt/mids-w261/datasets_final_project/parquet_airlines_data/*")
  print('df_airlines has {} records and {} columns'.format(df_airlines.count(), len(df_airlines.columns)))
  
  final_df_airlines = FlightPreprocessing(df_airlines)
  print('final_df_airlines has {} records and {} columns'.format(final_df_airlines.count(), len(final_df_airlines.columns)))
  
  # Load all Weather Data
  df_weather = spark.read.parquet("/mnt/mids-w261/datasets_final_project/weather_data/*")
  print('df_weather has {} records and {} columns'.format(df_weather.count(), len(df_weather.columns)))
  
  filtered_weather = WeatherPreprocessing(df_weather)
  print('filtered_weather has {} records and {} columns'.format(filtered_weather.count(), len(filtered_weather.columns)))
  
  # Load all Station Data
  df_stations = spark.read.parquet("/mnt/mids-w261/datasets_final_project/stations_data/*")
  print('df_stations has {} records and {} columns'.format(df_stations.count(), len(df_stations.columns)))
  
  # Create a joined table for US timezones and airports
  airport_tz_join = aptz.join(ustz, aptz.TIMEZONE == ustz.TIMEZONE).drop(aptz.TIMEZONE)
  
  #Filter out any international flights for the final flights dataset
  final_df_airlines_us = final_df_airlines.join(airport_tz_join, (final_df_airlines.ORIGIN == airport_tz_join.AIRPORT), 'inner').drop(airport_tz_join.AIRPORT)
  final_df_airlines_us = final_df_airlines_us.join(airport_tz_join, (final_df_airlines_us.DEST == airport_tz_join.AIRPORT), 'inner').drop(airport_tz_join.AIRPORT)
  
  #Create station key table -- Filter station and remove country code from the FAA code
  filt_station = df_stations.filter(df_stations.station_id == df_stations.neighbor_id).filter(df_stations.neighbor_call.substr(1,1) == 'K')
  key_station = filt_station.select('station_id', filt_station.neighbor_name.alias('airport_name'), filt_station.neighbor_call.substr(2,4).alias('FAA_Code'))
  print('key_station has {} records and {} columns'.format(key_station.count(), len(key_station.columns)))
  
  
  return final_df_airlines_us, filtered_weather, key_station

In [0]:
def full_join(path):
  
  #Pull all filtered data
  final_df_airlines_us, filtered_weather, key_station = data_load()
  
  #Create station keys for joining
  station_org = key_station.select(*(col(x).alias('org_' + x) for x in key_station.columns))
  
  #Destination not used, but could be for future use
  station_des = key_station.select(*(col(x).alias('des_' + x) for x in key_station.columns))
  
  #Define columns selected for the final joined dataset
  final_columns = ['UNIQUE_ID','UTC_TIMESTAMP','DATE','TIME_OF_DAY','STATION','NAME','MONTH','DAY_OF_WEEK','OP_UNIQUE_CARRIER','TAIL_NUM','ORIGIN','DEST','DEP_DEL15',
                   'DEP_DELAY_NEW','ARR_DELAY_NEW','CRS_ELAPSED_TIME','SOURCE','LATITUDE','LONGITUDE','ELEVATION','CALL_SIGN','wnd_dir_angle', 'wnd_dir_qual', 'wnd_type',
                   'wnd_spd_rate', 'wnd_spd_qual', 'wnd_ex', 'wnd_dir_is_qual','wnd_spd_is_qual', 'cig_ceil_ht','cig_ceil_qual', 'cig_ceil_det', 'cig_cavok', 'cig_ex',
                   'cig_cavok_bool', 'cig_ceil_is_qual', 'vis_dist', 'vis_dist_qual', 'vis_dist_var', 'vis_dist_qual_var', 'vis_ex', 'vis_dist_var_bool','vis_dist_is_qual',
                   'vis_dist_is_qual_var', 'tmp_air', 'tmp_air_qual', 'tmp_ex', 'tmp_air_is_qual', 'dew_pnt_tmp', 'dew_pnt_qual','dew_ex', 'dew_pnt_is_qual', 'slp_prs',
                   'slp_prs_qual','slp_ex','slp_prs_is_qual', 'aa1_prd_quant_hr', 'aa1_dp', 'aa1_cond', 'aa1_qual', 'aa1_ex', 'aa1_is_qual', 'aj1_dim', 'aj1_cond',
                   'aj1_qual', 'aj1_eq_wtr_dp', 'aj1_eq_wtr_cond', 'aj1_eq_wtr_cond_qual', 'aj1_ex','aj1_is_qual', 'aj1_eq_wtr_cond_is_qual', 'ga1_cov', 'ga1_cov_qual',
                   'ga1_bs_ht', 'ga1_bs_ht_qual', 'ga1_cld', 'ga1_cld_qual', 'ga1_ex', 'ga1_cov_is_qual', 'ga1_bs_ht_is_qual', 'ka1_prd_quant', 'ka1_code','ka1_temp',
                   'ka1_temp_qual', 'ka1_ex', 'ka1_temp_is_qual', 'at1_src_elem','at1_wthr', 'at1_wthr_abrv', 'at1_qual', 'at1_ex', 'at1_is_qual', 'ax1_atm', 'ax1_qual',
                   'ax1_prd_quant', 'ax1_prd_qual', 'ax1_ex','ax1_is_qual', 'ax1_prd_is_qual']
  
  #Full data set join for airports and flights
  join_full_org = final_df_airlines_us.join(station_org, final_df_airlines_us.ORIGIN == station_org.org_FAA_Code, 'inner')
  join_full_airports = join_full_org.join(station_des, join_full_org.DEST == station_des.des_FAA_Code, 'inner')
  
  join_full = join_full_airports.join(filtered_weather, (join_full_airports.org_station_id == filtered_weather.STATION) & \
                              ((join_full_airports.UTC_TIMESTAMP.cast("long") - filtered_weather.DATE.cast("long"))/3600 <= 3.0) & \
                              ((join_full_airports.UTC_TIMESTAMP.cast("long") - filtered_weather.DATE.cast("long"))/3600 > 2.0), 'inner').select(*final_columns)

  join_full = join_full.withColumnRenamed(existing = 'UTC_TIMESTAMP', new = 'FLIGHT_UTC_DATE')
  join_full = join_full.withColumnRenamed(existing = 'DATE', new = 'WEATHER_UTC_DATE')
  
  
  #Remove if exists
  dbutils.fs.rm(path, recurse=True)
  
  #Parquet Write
  join_full.write.parquet(path)

## Join - Proof of Concept
**Only on 2 days of data**

In [0]:
weather_2d = filtered_weather.filter(col('DATE') < "2015-01-03T00:00:00.000")
display(weather_2d)

STATION,DATE,SOURCE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,GA1,GE1,GF1,MA1,REM,GD1
3809099999,2015-01-01T00:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"210,1,N,0077,1","00183,1,C,N",008000199,+01101,+01001,999999,"02,1,+00122,1,99,9","9,AGL ,+99999,+99999",99999021999001221999999,102901999999,MET079METAR EGDR 010050Z 21015KT 8000 -DZ FEW004 SCT006 OVC010 11/10 Q1029 YLO1=,
3809099999,2015-01-01T01:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"200,1,N,0082,1","00244,1,9,N",008000199,+01201,+01001,999999,"04,1,+00183,1,99,9","9,AGL ,+99999,+99999",99999041999001831999999,102901999999,MET086METAR EGDR 010150Z 20016G28KT 8000 HZ SCT006 BKN008 OVC010 12/10 Q1029 REDZ YLO1=,
3809099999,2015-01-01T02:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"210,1,N,0093,1","00122,1,9,N",006000199,+01101,+01101,999999,"02,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999021999000611999999,102901999999,MET079METAR EGDR 010250Z 21018KT 6000 -DZ FEW002 BKN004 OVC007 11/11 Q1029 YLO2=,
3809099999,2015-01-01T03:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"200,1,N,0082,1","00122,1,9,N",006000199,+01101,+01101,999999,"02,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999021999000611999999,102801999999,MET082METAR EGDR 010350Z 20016G26KT 6000 -DZ FEW002 BKN004 OVC006 11/11 Q1028 YLO2=,
3809099999,2015-01-01T04:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"200,1,N,0082,1","00122,1,9,N",002500199,+01101,+01101,999999,"04,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999041999000611999999,102801999999,MET076METAR EGDR 010450Z 20016G27KT 2500 -RADZ SCT002 OVC004 11/11 Q1028 AMB=,
3809099999,2015-01-01T05:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"200,1,N,0093,1","00061,1,C,N",003000199,+01201,+01101,999999,"02,1,+00030,1,99,9","9,AGL ,+99999,+99999",99999021999000301999999,102701999999,MET085METAR EGDR 010550Z 20018G29KT 3000 BR FEW001 SCT002 BKN004 12/11 Q1027 RERA AMB=,
3809099999,2015-01-01T06:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"210,1,N,0088,1","00152,1,9,N",003000199,+01201,+01101,999999,"04,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999041999000611999999,102701999999,MET080METAR EGDR 010650Z 21017G28KT 3000 BR SCT002 BKN005 OVC010 12/11 Q1027 AMB=,
3809099999,2015-01-01T07:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"210,1,N,0082,1","00091,1,C,N",004500199,+01201,+01101,999999,"02,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999021999000611999999,102801999999,MET081METAR EGDR 010750Z 21016G26KT 4500 BR FEW002 SCT003 OVC007 12/11 Q1028 YLO2=,
3809099999,2015-01-01T08:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"200,1,N,0077,1","00122,1,9,N",006000199,+01201,+01101,999999,"02,1,+00061,1,99,9","9,AGL ,+99999,+99999",99999021999000611999999,102801999999,MET113METAR EGDR 010850Z 20015G26KT 6000 HZ FEW002 BKN004 OVC008 12/11 Q1028 YLO2 TEMPO 1500 BR SCT002 OVC006 AMB=,
3809099999,2015-01-01T09:50:00.000+0000,4,50.086092,-5.255711,81.38,"CULDROSE, UK",FM-15,99999,V020,"210,1,N,0093,1","00213,1,9,N",007000199,+01201,+01101,999999,"04,1,+00122,1,99,9","9,AGL ,+99999,+99999",99999041999001221999999,102801999999,MET113METAR EGDR 010950Z 21018G28KT 7000 BR SCT004 BKN007 OVC010 12/11 Q1028 YLO2 TEMPO 1500 BR SCT002 OVC006 AMB=,


In [0]:
flights_2d = final_df_airlines.filter(col('UTC_TIMESTAMP') < "2015-01-03T00:00:00.000")
display(flights_2d)

MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,DATE_TIME,UNIQUE_ID,UTC_TIMESTAMP
JAN,THURSDAY,N3HJAA,PHX,EWR,0.0,0.0,275.0,2015-01-01T13:25:00.000+0000,AA-1280-2015-01-01 13:25:00,2015-01-01T20:25:00.000+0000
JAN,FRIDAY,N3ABAA,PHX,EWR,0.0,0.0,275.0,2015-01-02T13:25:00.000+0000,AA-1280-2015-01-02 13:25:00,2015-01-02T20:25:00.000+0000
JAN,THURSDAY,N3DWAA,EWR,PHX,0.0,27.0,330.0,2015-01-01T10:29:00.000+0000,AA-1281-2015-01-01 10:29:00,2015-01-01T15:29:00.000+0000
JAN,FRIDAY,N3FWAA,EWR,PHX,0.0,31.0,330.0,2015-01-02T10:29:00.000+0000,AA-1281-2015-01-02 10:29:00,2015-01-02T15:29:00.000+0000
JAN,THURSDAY,N3CRAA,BNA,MIA,0.0,0.0,140.0,2015-01-01T06:45:00.000+0000,AA-1283-2015-01-01 06:45:00,2015-01-01T12:45:00.000+0000
JAN,FRIDAY,N3CHAA,BNA,MIA,0.0,11.0,140.0,2015-01-02T06:45:00.000+0000,AA-1283-2015-01-02 06:45:00,2015-01-02T12:45:00.000+0000
JAN,THURSDAY,N4YSAA,DFW,MCI,1.0,33.0,90.0,2015-01-01T07:30:00.000+0000,AA-1284-2015-01-01 07:30:00,2015-01-01T13:30:00.000+0000
JAN,FRIDAY,N4XYAA,DFW,MCI,0.0,0.0,90.0,2015-01-02T07:30:00.000+0000,AA-1284-2015-01-02 07:30:00,2015-01-02T13:30:00.000+0000
JAN,THURSDAY,N4YSAA,MCI,DFW,1.0,32.0,100.0,2015-01-01T09:40:00.000+0000,AA-1284-2015-01-01 09:40:00,2015-01-01T15:40:00.000+0000
JAN,FRIDAY,N4XYAA,MCI,DFW,0.0,13.0,100.0,2015-01-02T09:40:00.000+0000,AA-1284-2015-01-02 09:40:00,2015-01-02T15:40:00.000+0000


###Join Stations and Flight data

In [0]:
#Join station and flight 

join_org = flights_2d.join(station_org, flights_2d.ORIGIN == station_org.org_FAA_Code, 'inner')
join_airports = join_org.join(station_des, join_org.DEST == station_des.des_FAA_Code, 'inner')
display(join_airports)

MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,DATE_TIME,UNIQUE_ID,UTC_TIMESTAMP,org_station_id,org_airport_name,org_FAA_Code,des_station_id,des_airport_name,des_FAA_Code
JAN,FRIDAY,N453SW,INL,BRD,0.0,4.0,58.0,2015-01-02T06:00:00.000+0000,OO-7423-2015-01-02 06:00:00,2015-01-02T12:00:00.000+0000,72747014918,FALLS INTERNATIONAL AIRPORT,INL,72655594938,BRAINERD LAKES RGNL ARPT,BRD
JAN,THURSDAY,N427SW,INL,BRD,0.0,0.0,58.0,2015-01-01T06:00:00.000+0000,OO-7423-2015-01-01 06:00:00,2015-01-01T12:00:00.000+0000,72747014918,FALLS INTERNATIONAL AIRPORT,INL,72655594938,BRAINERD LAKES RGNL ARPT,BRD
JAN,THURSDAY,N453SW,INL,MSP,0.0,0.0,78.0,2015-01-01T12:56:00.000+0000,OO-7426-2015-01-01 12:56:00,2015-01-01T18:56:00.000+0000,72747014918,FALLS INTERNATIONAL AIRPORT,INL,72658014922,MINNEAPOLIS-ST PAUL INTERNATI,MSP
JAN,FRIDAY,N430SW,INL,MSP,0.0,6.0,78.0,2015-01-02T12:56:00.000+0000,OO-7426-2015-01-02 12:56:00,2015-01-02T18:56:00.000+0000,72747014918,FALLS INTERNATIONAL AIRPORT,INL,72658014922,MINNEAPOLIS-ST PAUL INTERNATI,MSP
JAN,THURSDAY,N236JB,MSY,JFK,0.0,0.0,168.0,2015-01-01T06:00:00.000+0000,B6-776-2015-01-01 06:00:00,2015-01-01T12:00:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,74486094789,JOHN F KENNEDY INTERNATIONAL,JFK
JAN,FRIDAY,N317JB,MSY,JFK,0.0,0.0,168.0,2015-01-02T06:00:00.000+0000,B6-776-2015-01-02 06:00:00,2015-01-02T12:00:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,74486094789,JOHN F KENNEDY INTERNATIONAL,JFK
JAN,THURSDAY,N14173,MSY,IAH,0.0,0.0,79.0,2015-01-01T12:12:00.000+0000,EV-4217-2015-01-01 12:12:00,2015-01-01T18:12:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,72243012960,G BUSH INTERCONTINENTAL AP/HO,IAH
JAN,FRIDAY,N34110,MSY,ORD,0.0,0.0,152.0,2015-01-02T09:47:00.000+0000,EV-4284-2015-01-02 09:47:00,2015-01-02T15:47:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,72530094846,CHICAGO O'HARE INTERNATIONAL,ORD
JAN,THURSDAY,N931FR,MSY,DEN,1.0,31.0,173.0,2015-01-01T18:38:00.000+0000,F9-705-2015-01-01 18:38:00,2015-01-02T00:38:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,72565003017,DENVER INTERNATIONAL AIRPORT,DEN
JAN,THURSDAY,N948FR,MSY,DEN,0.0,3.0,175.0,2015-01-01T12:45:00.000+0000,F9-699-2015-01-01 12:45:00,2015-01-01T18:45:00.000+0000,72231012916,LOUIS ARMSTRONG NEW ORLEANS I,MSY,72565003017,DENVER INTERNATIONAL AIRPORT,DEN


###Full join and simple EDA

In [0]:
#Join weather and flight data -- NOTE: Only for Origin airport at the moment!
join_all = join_airports.join(weather_2d, (join_airports.org_station_id == weather_2d.STATION) & \
                              ((join_airports.UTC_TIMESTAMP.cast("long") - weather_2d.DATE.cast("long"))/3600 <= 3.0) & \
                              ((join_airports.UTC_TIMESTAMP.cast("long") - weather_2d.DATE.cast("long"))/3600 > 2.0), 'inner').select(*final_columns)

join_all = join_all.withColumnRenamed(existing = 'UTC_TIMESTAMP', new = 'FLIGHT_UTC_DATE')
join_all = join_all.withColumnRenamed(existing = 'DATE', new = 'WEATHER_UTC_DATE')
display(join_all)

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,STATION,NAME,MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,GA1,GE1,GF1,MA1,REM,GD1
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T11:15:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5",+01205,+01105,999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102515102415,MET07601/02/15 06:15:02 METAR KBQK 021115Z 00000KT 10SM OVC015 12/11 A3027 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T11:35:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5",+01205,+01105,999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102545102445,MET07601/02/15 06:35:02 METAR KBQK 021135Z 00000KT 10SM OVC015 12/11 A3028 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T11:55:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5","+0120,C","+0110,C",999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102575102485,MET07601/02/15 06:55:02 METAR KBQK 021155Z 00000KT 10SM OVC015 12/11 A3029 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T12:15:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"030,5,N,0015,5","00457,5,M,N","016093,5,N,5",+01205,+01105,999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102575102485,MET07601/02/15 07:15:02 METAR KBQK 021215Z 03003KT 10SM OVC015 12/11 A3029 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T12:35:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5",+01205,+01105,999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102575102485,MET07601/02/15 07:35:02 METAR KBQK 021235Z 00000KT 10SM OVC015 12/11 A3029 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T12:55:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5","+0130,C","+0110,C",999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102645102545,MET07601/02/15 07:55:02 METAR KBQK 021255Z 00000KT 10SM OVC015 13/11 A3031 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T13:15:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00457,5,M,N","016093,5,N,5",+01305,+01105,999999,"08,5,+00457,5,99,9","9,AGL ,+99999,+99999",99999999999004571999999,102645102545,MET07601/02/15 08:15:02 METAR KBQK 021315Z 00000KT 10SM OVC015 13/11 A3031 RMK AO1,"4,99,1,+00457,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T13:35:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00518,5,M,N","016093,5,N,5",+01305,+01105,999999,"08,5,+00518,5,99,9","9,AGL ,+99999,+99999",99999999999005181999999,102685102585,MET08801/02/15 08:35:02 METAR KBQK 021335Z 00000KT 10SM OVC017 13/11 A3032 RMK AO1 LTG DSNT SE,"4,99,1,+00518,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T13:55:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"999,9,C,0000,5","00518,5,M,N","016093,5,N,5","+0130,C","+0110,C",999999,"07,5,+00518,5,99,9","9,AGL ,+99999,+99999",99999999999005181999999,102685102585,MET08301/02/15 08:55:02 METAR KBQK 021355Z 00000KT 10SM BKN017 OVC030 13/11 A3032 RMK AO1,"3,99,1,+00518,5,9"
EV-5527-2015-01-02 12:00:00,2015-01-02T17:00:00.000+0000,2015-01-02T14:15:00.000+0000,72213653883,"BRUNSWICK GLYNCO JETPORT AIRPORT, GA US",JAN,FRIDAY,N853AS,BQK,ATL,0.0,12.0,71.0,7,31.25889,-81.46611,7.9,FM-15,KBQK,V020,"040,5,N,0021,5","00579,5,M,N","016093,5,N,5",+01405,+01105,999999,"07,5,+00579,5,99,9","9,AGL ,+99999,+99999",99999999999005791999999,102685102585,MET08301/02/15 09:15:02 METAR KBQK 021415Z 04004KT 10SM BKN019 OVC030 14/11 A3032 RMK AO1,"3,99,1,+00579,5,9"


In [0]:
#EDA on joined data to make sure the data was joined correctly. Checking on a couple of flights:

display(join_all.filter(col('UNIQUE_ID') == 'AA-1080-2015-01-02 11:45:00'))

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,STATION,NAME,MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,GA1,GE1,GF1,MA1,REM,GD1
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T11:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-725,-945,102435,"02,5,+07620,5,99,9","9,AGL ,+99999,+99999",02995999999076201999999,102345099885,MET11901/02/15 05:51:02 METAR KORD 021151Z 00000KT 10SM FEW250 M07/M09 A3022 RMK AO2 SLP243 T10721094 11044 21078 53017 (SMN),"1,99,1,+07620,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T12:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-785,-1005,102465,"02,5,+03353,5,99,9","9,AGL ,+99999,+99999",04995999999033531999999,102375099915,MET10801/02/15 06:51:02 METAR KORD 021251Z 00000KT 10SM FEW110 SCT250 M08/M10 A3023 RMK AO2 SLP246 T10781100 (EPA),"1,99,1,+03353,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T13:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-785,-1065,102555,"02,5,+00762,5,99,9","9,AGL ,+99999,+99999",02995999999007621999999,102475100015,MET10801/02/15 07:51:02 METAR KORD 021351Z 00000KT 10SM FEW025 FEW250 M08/M11 A3026 RMK AO2 SLP255 T10781106 (EPA),"1,99,1,+00762,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T14:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"300,5,N,0015,5","22000,5,9,N","016093,5,N,5",-445,-785,102685,"02,5,+07620,5,99,9","9,AGL ,+99999,+99999",02995999999076201999999,102575100115,MET10701/02/15 08:51:02 METAR KORD 021451Z 30003KT 10SM FEW250 M04/M08 A3029 RMK AO2 SLP268 T10441078 53025 (EPA),"1,99,1,+07620,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T15:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-285,-725,102725,"04,5,+07620,5,99,9","9,AGL ,+99999,+99999",04995999999076201999999,102645100185,MET10101/02/15 09:51:02 METAR KORD 021551Z 00000KT 10SM SCT250 M03/M07 A3031 RMK AO2 SLP272 T10281072 (EPA),"2,99,1,+07620,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T16:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","07620,5,M,N","016093,5,N,5",-175,-725,102685,"07,5,+07620,5,99,9","9,AGL ,+99999,+99999",99999999999076201999999,102615100145,MET10101/02/15 10:51:02 METAR KORD 021651Z 00000KT 10SM BKN250 M02/M07 A3030 RMK AO2 SLP268 T10171072 (EPA),"3,99,1,+07620,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T11:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-725,-945,102435,"02,5,+07620,5,99,9","9,AGL ,+99999,+99999",02995999999076201999999,102345099885,MET11901/02/15 05:51:02 METAR KORD 021151Z 00000KT 10SM FEW250 M07/M09 A3022 RMK AO2 SLP243 T10721094 11044 21078 53017 (SMN),"1,99,1,+07620,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T12:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-785,-1005,102465,"02,5,+03353,5,99,9","9,AGL ,+99999,+99999",04995999999033531999999,102375099915,MET10801/02/15 06:51:02 METAR KORD 021251Z 00000KT 10SM FEW110 SCT250 M08/M10 A3023 RMK AO2 SLP246 T10781100 (EPA),"1,99,1,+03353,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T13:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"999,9,C,0000,5","22000,5,9,N","016093,5,N,5",-785,-1065,102555,"02,5,+00762,5,99,9","9,AGL ,+99999,+99999",02995999999007621999999,102475100015,MET10801/02/15 07:51:02 METAR KORD 021351Z 00000KT 10SM FEW025 FEW250 M08/M11 A3026 RMK AO2 SLP255 T10781106 (EPA),"1,99,1,+00762,5,9"
AA-1080-2015-01-02 11:45:00,2015-01-02T17:45:00.000+0000,2015-01-02T14:51:00.000+0000,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,N002AA,ORD,EGE,0.0,0.0,170.0,7,41.995,-87.9336,201.8,FM-15,KORD,V030,"300,5,N,0015,5","22000,5,9,N","016093,5,N,5",-445,-785,102685,"02,5,+07620,5,99,9","9,AGL ,+99999,+99999",02995999999076201999999,102575100115,MET10701/02/15 08:51:02 METAR KORD 021451Z 30003KT 10SM FEW250 M04/M08 A3029 RMK AO2 SLP268 T10441078 53025 (EPA),"1,99,1,+07620,5,9"


#Full Dataset Join

###Rationale for 6 hours

> *Strategic traffic flow managers must plan hours in advance to influence long-haul flights. If the time needed for pre-departure planning and filing of amended flight plans is added to the airborne time intervals, predictions of convective weather impacts on airspace capacity are needed 4-8 hours in advance to influence long-haul flights and 2-6 hours in advance to influence shorter flights.*

**- FAA:  <https://www.faa.gov/nextgen/programs/weather/faq/>**

6 hours was chosen because that's the earliest the FAA will make a decision for pre-departure planning

In [0]:
data_load()

In [0]:
#To ensure checkpoints are saved by the date and not easily overwritten:
now = str(datetime.date.today())

PARQUET_DATA_PATH_TODAY = f"{blob_url}/{now}_data_chkpt_PQ_6m_test"

full_join(PARQUET_DATA_PATH_TODAY)

In [0]:
# Test to see if the file is written correctly
join_eda = spark.read.parquet(PARQUET_DATA_PATH_TODAY)
display(join_eda)

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,TIME_OF_DAY,STATION,NAME,MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,DEP_DELAY_NEW,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,CALL_SIGN,wnd_dir_angle,wnd_dir_qual,wnd_type,wnd_spd_rate,wnd_spd_qual,wnd_ex,wnd_dir_is_qual,wnd_spd_is_qual,cig_ceil_ht,cig_ceil_qual,cig_ceil_det,cig_cavok,cig_ex,cig_cavok_bool,cig_ceil_is_qual,vis_dist,vis_dist_qual,vis_dist_var,vis_dist_qual_var,vis_ex,vis_dist_var_bool,vis_dist_is_qual,vis_dist_is_qual_var,tmp_air,tmp_air_qual,tmp_ex,tmp_air_is_qual,dew_pnt_tmp,dew_pnt_qual,dew_ex,dew_pnt_is_qual,slp_prs,slp_prs_qual,slp_ex,slp_prs_is_qual,aa1_prd_quant_hr,aa1_dp,aa1_cond,aa1_qual,aa1_ex,aa1_is_qual,aj1_dim,aj1_cond,aj1_qual,aj1_eq_wtr_dp,aj1_eq_wtr_cond,aj1_eq_wtr_cond_qual,aj1_ex,aj1_is_qual,aj1_eq_wtr_cond_is_qual,ga1_cov,ga1_cov_qual,ga1_bs_ht,ga1_bs_ht_qual,ga1_cld,ga1_cld_qual,ga1_ex,ga1_cov_is_qual,ga1_bs_ht_is_qual,ka1_prd_quant,ka1_code,ka1_temp,ka1_temp_qual,ka1_ex,ka1_temp_is_qual,at1_src_elem,at1_wthr,at1_wthr_abrv,at1_qual,at1_ex,at1_is_qual,ax1_atm,ax1_qual,ax1_prd_quant,ax1_prd_qual,ax1_ex,ax1_is_qual,ax1_prd_is_qual
AA-1047-2015-01-21 20:17:00,2015-01-22T01:17:00.000+0000,2015-01-21T22:52:00.000+0000,Evening,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JAN,WEDNESDAY,AA,N559AA,ATL,DFW,0,0,0,148,7,33.6301,-84.4418,307.8,KATL,320.0,5,N,46,5,1,1,1,7620,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,144,5,1,1,-28,5,1,1,10197,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,6096.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
AA-1087-2015-06-08 17:50:00,2015-06-08T21:50:00.000+0000,2015-06-08T18:52:00.000+0000,Evening,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JUNE,MONDAY,AA,N028AA,ATL,DFW,0,0,4,143,7,33.6301,-84.4418,307.8,KATL,220.0,5,N,67,5,1,1,1,3048,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,300,5,1,1,189,5,1,1,10150,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,1372.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
AA-1087-2015-06-12 17:50:00,2015-06-12T21:50:00.000+0000,2015-06-12T18:52:00.000+0000,Evening,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JUNE,FRIDAY,AA,N018AA,ATL,DFW,0,11,8,143,7,33.6301,-84.4418,307.8,KATL,,9,V,21,5,1,1,1,2438,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,272,5,1,1,217,5,1,1,10185,5,1,1,1.0,0.0,2.0,5.0,1,1,,,,,,,1,1,1,2.0,5,488.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
AA-1123-2015-06-30 16:10:00,2015-06-30T20:10:00.000+0000,2015-06-30T17:52:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JUNE,TUESDAY,AA,N008AA,ATL,DFW,0,0,0,148,7,33.6301,-84.4418,307.8,KATL,270.0,5,N,57,5,1,1,1,7620,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,278,5,1,1,200,5,1,1,10148,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,914.0,5,,1,1,1,1,60.0,M,283.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1123-2015-06-30 16:10:00,2015-06-30T20:10:00.000+0000,2015-06-30T18:00:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JUNE,TUESDAY,AA,N008AA,ATL,DFW,0,0,0,148,4,33.6301,-84.4418,307.8,99999,270.0,1,N,57,1,1,1,1,99999,9,,N,1,1,1,16000,1,,9,1,1,1,1,278,1,1,1,200,1,1,1,10148,1,1,1,6.0,20.0,3.0,1.0,1,1,,,,,,,1,1,1,,9,800.0,1,,1,1,1,1,120.0,M,283.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1162-2015-01-07 09:47:00,2015-01-07T14:47:00.000+0000,2015-01-07T11:52:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JAN,WEDNESDAY,AA,N002AA,ATL,MIA,0,0,7,119,7,33.6301,-84.4418,307.8,KATL,310.0,5,N,57,5,1,1,1,22000,5,,N,1,1,1,16093,5,N,5,1,1,1,1,0,5,1,1,-50,5,1,1,10243,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,4572.0,5,,1,1,1,1,60.0,M,39.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1162-2015-01-07 09:47:00,2015-01-07T14:47:00.000+0000,2015-01-07T12:00:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JAN,WEDNESDAY,AA,N002AA,ATL,MIA,0,0,7,119,4,33.6301,-84.4418,307.8,99999,310.0,1,N,57,1,1,1,1,99999,9,,N,1,1,1,16000,1,,9,1,1,1,1,0,1,1,1,-50,1,1,1,10243,1,1,1,,,,,1,1,,,,,,,1,1,1,,,,,,1,1,1,1,240.0,M,122.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1162-2015-02-01 09:47:00,2015-02-01T14:47:00.000+0000,2015-02-01T11:52:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",FEB,SUNDAY,AA,N020AA,ATL,MIA,0,0,0,119,7,33.6301,-84.4418,307.8,KATL,120.0,5,N,21,5,1,1,1,5486,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,50,5,1,1,-22,5,1,1,10229,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,4572.0,5,,1,1,1,1,60.0,M,78.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1162-2015-02-01 09:47:00,2015-02-01T14:47:00.000+0000,2015-02-01T12:00:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",FEB,SUNDAY,AA,N020AA,ATL,MIA,0,0,0,119,4,33.6301,-84.4418,307.8,99999,120.0,1,N,21,1,1,1,1,99999,9,,N,1,1,1,16000,1,,9,1,1,1,1,50,1,1,1,-22,1,1,1,10229,1,1,1,,,,,1,1,,,,,,,1,1,1,,,,,,1,1,1,1,240.0,M,133.0,1.0,1,1,,,,,1,1,,,,,1,1,1
AA-1249-2015-04-14 18:40:00,2015-04-14T22:40:00.000+0000,2015-04-14T19:52:00.000+0000,Evening,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",APR,TUESDAY,AA,N018AA,ATL,LAX,1,35,11,296,6,33.6301,-84.4418,307.8,KATL,240.0,5,N,46,5,1,1,1,7620,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,272,5,1,1,172,5,1,1,10160,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,1372.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1


#Join EDA

In [0]:
# Configure Path
DATA_PATH = f"{blob_url}/2022-03-27_data_chkpt_PQ_full"

In [0]:
join_eda = spark.read.parquet(DATA_PATH)
display(join_eda)

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,TIME_OF_DAY,STATION,NAME,MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,DEP_DELAY_NEW,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,CALL_SIGN,wnd_dir_angle,wnd_dir_qual,wnd_type,wnd_spd_rate,wnd_spd_qual,wnd_ex,wnd_dir_is_qual,wnd_spd_is_qual,cig_ceil_ht,cig_ceil_qual,cig_ceil_det,cig_cavok,cig_ex,cig_cavok_bool,cig_ceil_is_qual,vis_dist,vis_dist_qual,vis_dist_var,vis_dist_qual_var,vis_ex,vis_dist_var_bool,vis_dist_is_qual,vis_dist_is_qual_var,tmp_air,tmp_air_qual,tmp_ex,tmp_air_is_qual,dew_pnt_tmp,dew_pnt_qual,dew_ex,dew_pnt_is_qual,slp_prs,slp_prs_qual,slp_ex,slp_prs_is_qual,aa1_prd_quant_hr,aa1_dp,aa1_cond,aa1_qual,aa1_ex,aa1_is_qual,aj1_dim,aj1_cond,aj1_qual,aj1_eq_wtr_dp,aj1_eq_wtr_cond,aj1_eq_wtr_cond_qual,aj1_ex,aj1_is_qual,aj1_eq_wtr_cond_is_qual,ga1_cov,ga1_cov_qual,ga1_bs_ht,ga1_bs_ht_qual,ga1_cld,ga1_cld_qual,ga1_ex,ga1_cov_is_qual,ga1_bs_ht_is_qual,ka1_prd_quant,ka1_code,ka1_temp,ka1_temp_qual,ka1_ex,ka1_temp_is_qual,at1_src_elem,at1_wthr,at1_wthr_abrv,at1_qual,at1_ex,at1_is_qual,ax1_atm,ax1_qual,ax1_prd_quant,ax1_prd_qual,ax1_ex,ax1_is_qual,ax1_prd_is_qual
9E-3280-2018-06-29 15:22:00,2018-06-29T19:22:00.000+0000,2018-06-29T16:52:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JUNE,FRIDAY,9E,N8980A,ATL,GTR,0,0,0,78,7,33.6301,-84.4418,307.8,KATL,20.0,5,N,31,5,1,1,1,9144,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,311,5,1,1,217,5,1,1,10169,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,1219.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3280-2018-07-09 15:20:00,2018-07-09T19:20:00.000+0000,2018-07-09T16:52:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JULY,MONDAY,9E,N8683B,ATL,GTR,0,0,1,74,7,33.6301,-84.4418,307.8,KATL,310.0,5,N,15,5,1,1,1,22000,5,,N,1,1,1,16093,5,N,5,1,1,1,1,300,5,1,1,189,5,1,1,10244,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,4.0,5,1372.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3280-2018-10-19 19:57:00,2018-10-19T23:57:00.000+0000,2018-10-19T21:52:00.000+0000,Evening,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",OCT,FRIDAY,9E,N833AY,ATL,EWN,0,0,0,88,7,33.6301,-84.4418,307.8,KATL,160.0,5,N,26,5,1,1,1,457,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,178,5,1,1,122,5,1,1,10219,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,8.0,5,457.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3281-2018-02-15 13:40:00,2018-02-15T18:40:00.000+0000,2018-02-15T15:50:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",FEB,THURSDAY,9E,N819AY,ATL,TRI,0,14,0,70,6,33.6301,-84.4418,307.8,KATL,250.0,5,N,46,5,1,1,1,762,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,190,5,1,1,170,5,1,1,0,9,1,1,,,,,1,1,,,,,,,1,1,1,2.0,5,274.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3281-2018-02-15 13:40:00,2018-02-15T18:40:00.000+0000,2018-02-15T15:52:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",FEB,THURSDAY,9E,N819AY,ATL,TRI,0,14,0,70,7,33.6301,-84.4418,307.8,KATL,240.0,5,N,51,5,1,1,1,762,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,189,5,1,1,167,5,1,1,10247,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,274.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3281-2018-02-15 13:40:00,2018-02-15T18:40:00.000+0000,2018-02-15T16:09:00.000+0000,Afternoon,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",FEB,THURSDAY,9E,N819AY,ATL,TRI,0,14,0,70,7,33.6301,-84.4418,307.8,KATL,250.0,5,N,46,5,1,1,1,396,5,M,N,1,1,1,16093,5,N,5,1,1,1,1,183,5,1,1,167,5,1,1,0,9,1,1,,,,,1,1,,,,,,,1,1,1,7.0,5,396.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3281-2018-08-05 10:30:00,2018-08-05T14:30:00.000+0000,2018-08-05T11:52:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",AUG,SUNDAY,9E,N8986B,ATL,GTR,0,0,0,70,7,33.6301,-84.4418,307.8,KATL,,9,C,0,5,1,1,1,22000,5,,N,1,1,1,16093,5,N,5,1,1,1,1,250,5,1,1,222,5,1,1,10234,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,2.0,5,305.0,5,,1,1,1,1,60.0,M,267.0,1.0,1,1,,,,,1,1,,,,,1,1,1
9E-3281-2018-08-05 10:30:00,2018-08-05T14:30:00.000+0000,2018-08-05T12:00:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",AUG,SUNDAY,9E,N8986B,ATL,GTR,0,0,0,70,4,33.6301,-84.4418,307.8,99999,,9,C,0,1,1,1,1,22000,1,,N,1,1,1,16000,1,,9,1,1,1,1,250,1,1,1,222,1,1,1,10234,1,1,1,,,,,1,1,,,,,,,1,1,1,,9,450.0,1,,1,1,1,1,240.0,M,317.0,1.0,1,1,,,,,1,1,,,,,1,1,1
9E-3282-2019-01-16 10:27:00,2019-01-16T15:27:00.000+0000,2019-01-16T12:49:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JAN,WEDNESDAY,9E,N8896A,ATL,CSG,0,0,0,53,7,33.6301,-84.4418,307.8,KATL,320.0,5,N,46,5,1,1,1,152,5,M,N,1,1,1,6437,5,N,5,1,1,1,1,-10,5,1,1,-30,5,1,1,0,9,1,1,,,,,1,1,,,,,,,1,1,1,7.0,5,152.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1
9E-3282-2019-01-16 10:27:00,2019-01-16T15:27:00.000+0000,2019-01-16T12:52:00.000+0000,Morning,72219013874,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",JAN,WEDNESDAY,9E,N8896A,ATL,CSG,0,0,0,53,7,33.6301,-84.4418,307.8,KATL,330.0,5,N,41,5,1,1,1,152,5,M,N,1,1,1,6437,5,N,5,1,1,1,1,-6,5,1,1,-28,5,1,1,10255,5,1,1,1.0,0.0,,5.0,1,1,,,,,,,1,1,1,7.0,5,152.0,5,,1,1,1,1,,,,,1,1,,,,,1,1,,,,,1,1,1


In [0]:
total_records = join_eda.count()

#6month data - 429866 rows and 99 columns
(total_records, len(join_eda.columns))

In [0]:
#Continuous columns
cont_cols = [ 'wnd_spd_rate', 'cig_ceil_ht', 'vis_dist', 'tmp_air','dew_pnt_tmp','slp_prs', 'aa1_prd_quant_hr', 'aa1_dp', 'ga1_bs_ht','ELEVATION']

#Describing the continuous columns
display(join_eda.select(*cont_cols).describe())

summary,wnd_spd_rate,cig_ceil_ht,vis_dist,tmp_air,dew_pnt_tmp,slp_prs,aa1_prd_quant_hr,aa1_dp,ga1_bs_ht,ELEVATION
count,42419692.0,42419692.0,42419692.0,42419692.0,42419692.0,42419692.0,33310213.0,33310213.0,39865795.0,42419692.0
mean,38.918413103989536,20836.67388563312,14213.592326200764,161.97343674725408,87.42001933913146,8492.063888016914,1.2833162910126092,3.29737819448948,12367.066427848737,262.62869476643914
stddev,24.780926286346173,29580.369385876897,4156.039703717037,104.52439371764612,102.52383800030844,3770.53950285352,1.9632172871710103,20.79641967899329,30416.332852038417,408.7637538887137
min,0.0,0.0,0.0,-428.0,-394.0,0.0,0.0,0.0,0.0,0.3
max,530.0,99999.0,160000.0,500.0,340.0,10597.0,24.0,2575.0,99999.0,2353.1


In [0]:
#Missing data in the each column (in percent of total records)
display(join_eda.agg(*[(f.count(f.when(f.isnull(c), c))/total_records).alias(c) for c in join_eda.columns]))

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,TIME_OF_DAY,STATION,NAME,MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,DEP_DELAY_NEW,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,CALL_SIGN,wnd_dir_angle,wnd_dir_qual,wnd_type,wnd_spd_rate,wnd_spd_qual,wnd_ex,wnd_dir_is_qual,wnd_spd_is_qual,cig_ceil_ht,cig_ceil_qual,cig_ceil_det,cig_cavok,cig_ex,cig_cavok_bool,cig_ceil_is_qual,vis_dist,vis_dist_qual,vis_dist_var,vis_dist_qual_var,vis_ex,vis_dist_var_bool,vis_dist_is_qual,vis_dist_is_qual_var,tmp_air,tmp_air_qual,tmp_ex,tmp_air_is_qual,dew_pnt_tmp,dew_pnt_qual,dew_ex,dew_pnt_is_qual,slp_prs,slp_prs_qual,slp_ex,slp_prs_is_qual,aa1_prd_quant_hr,aa1_dp,aa1_cond,aa1_qual,aa1_ex,aa1_is_qual,aj1_dim,aj1_cond,aj1_qual,aj1_eq_wtr_dp,aj1_eq_wtr_cond,aj1_eq_wtr_cond_qual,aj1_ex,aj1_is_qual,aj1_eq_wtr_cond_is_qual,ga1_cov,ga1_cov_qual,ga1_bs_ht,ga1_bs_ht_qual,ga1_cld,ga1_cld_qual,ga1_ex,ga1_cov_is_qual,ga1_bs_ht_is_qual,ka1_prd_quant,ka1_code,ka1_temp,ka1_temp_qual,ka1_ex,ka1_temp_is_qual,at1_src_elem,at1_wthr,at1_wthr_abrv,at1_qual,at1_ex,at1_is_qual,ax1_atm,ax1_qual,ax1_prd_quant,ax1_prd_qual,ax1_ex,ax1_is_qual,ax1_prd_is_qual
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.2147464672775087,0.2147464672775087,0.2147464672775087,0.2147464672775087,0.0,0.0,0.9931412278995332,0.9931412278995332,0.9931412278995332,0.9931412278995332,0.9931412278995332,0.9931412278995332,0.0,0.0,0.0,0.0602054583517485,0.0602054583517485,0.0602054583517485,0.0602054583517485,0.0602054583517485,0.0,0.0,0.0,0.0,0.7988633203654567,0.7988633203654567,0.7988633203654567,0.7988633203654567,0.0,0.0,0.9999835453779344,0.9999835453779344,0.9999835453779344,0.9999835453779344,0.0,0.0,0.9999929278128658,0.9999929278128658,0.9999929278128658,0.9999929278128658,0.0,0.0,0.0


In [0]:
# value counts of column
display(join_eda.groupBy('NAME').count())

NAME,count
"APPLETON OUTAGAMIE CO AIRPORT, WI US",28912
"BALTIMORE WASHINGTON INTERNATIONAL AIRPORT, MD US",672295
"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",791960
"DAL FTW WSCMO AIRPORT, TX US",1823895
"CLEVELAND HOPKINS INTERNATIONAL AIRPORT, OH US",329250
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",1037876
"MOBILE DOWNTOWN AIRPORT, AL US",187
"BELLEVILLE SCOTT AFB, IL US",3127
"PHOENIX AIRPORT, AZ US",960299
"BOSTON, MA US",992079


In [0]:
# value counts of column
display(join_eda.select(col('TIME_OF_DAY'), col('DEP_DEL15')).groupBy('TIME_OF_DAY', 'DEP_DEL15').count())

TIME_OF_DAY,DEP_DEL15,count
Morning,0,15656465
Evening,0,8251124
Afternoon,0,9916724
Night,0,294150
Afternoon,1,2826007
Morning,1,2123622
Evening,1,3270531
Night,1,81069


In [0]:
# value counts of column
display(join_eda.select('ORIGIN','at1_src_elem','at1_wthr','at1_wthr_abrv','at1_qual','at1_ex','at1_is_qual').describe())

summary,ORIGIN,at1_src_elem,at1_wthr,at1_wthr_abrv,at1_qual,at1_ex,at1_is_qual
count,42419692,698,698.0,698,698.0,42419692.0,42419692.0
mean,,,16.008595988538683,,4.977077363896848,1.0,1.0
stddev,,,3.5873899233396087,,0.3021521033781236,0.0,0.0
min,ABE,AU,1.0,BR,1.0,1.0,1.0
max,YNG,AW,22.0,UP,5.0,1.0,1.0


In [0]:
# value counts of column
display(join_eda.select('ORIGIN','at1_src_elem','at1_wthr','at1_wthr_abrv','at1_qual','at1_ex','at1_is_qual').isnan().count())

#Delta Lake Write

In [0]:
#To ensure checkpoints are saved by the date and not easily overwritten:

now = str(datetime.date.today())

DELTALAKE_DATA_PATH_TODAY = f"{blob_url}/{now}_data_chkpt_DL_6m"
DELTALAKE_DATA_PATH_TODAY

In [0]:
# # Define the input and output formats and paths and the table name (partitioned by date).
write_format = 'delta'
partition_by = 'ORIGIN'
save_path = DELTALAKE_DATA_PATH_TODAY

# Remove table if it exists
dbutils.fs.rm(DELTALAKE_DATA_PATH_TODAY, recurse=True)

# # Write the data to its target.
join_full.write \
  .partitionBy(partition_by) \
  .format(write_format) \
  .save(save_path)

##Best format for reading Delta Lake

In [0]:
# Define the input and output formats and paths and the table name.
read_format = 'delta'
load_path = DELTALAKE_DATA_PATH_TODAY
save_path = '/tmp/delta/join'
# table_name = 'flights.weather300m'

# Load the data from its source.
join_eda = spark \
  .read \
  .format(read_format) \
  .load(load_path)

# Create the table.
# spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

# Review data
display(join_eda)

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,TIME_OF_DAY,STATION,NAME,MONTH,DAY_OF_WEEK,OP_UNIQUE_CARRIER,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,DEP_DELAY_NEW,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,REPORT_TYPE,CALL_SIGN,WND,CIG,VIS,TMP,DEW,SLP,GA1,AA1,AJ1,wnd_dir_angle,wnd_dir_qual,wnd_type,wnd_spd_rate,wnd_spd_qual,cig_ceil_ht,cig_ceil_qual,cig_ceil_det,cig_cavok,vis_dist,vis_dist_qual,vis_dist_var,vis_dist_qual_var,tmp_air,tmp_air_qual,dew_pnt_tmp,dew_pnt_qual,slp_prs,slp_prs_qual,aa1_prd_quant_hr,aa1_dp,aa1_cond,aa1_qual,aj1_dim,aj1_cond,aj1_qual,aj1_eq_wtr_dp,aj1_eq_wtr_cond,aj1_eq_wtr_cond_qual,ga1_cov,ga1_cov_qual,ga1_bs_ht,ga1_bs_ht_qual,ga1_cld,ga1_cld_qual
AA-1008-2015-01-11 18:45:00,2015-01-12T00:45:00.000+0000,2015-01-11T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,SUNDAY,AA,N3GCAA,ORD,RSW,1,31,46,166,7,41.995,-87.9336,201.8,FM-15,KORD,"210,5,N,0046,5","01829,5,M,N","016093,5,N,5",-65,-675,102415,"07,5,+01829,5,99,9",01000095,,210,5,N,46,5,1829,5,M,N,16093,5,N,5,-6,5,-67,5,10241,5,1.0,0.0,9.0,5,,,,,,,7.0,5.0,1829.0,5.0,99.0,9.0
AA-1008-2015-01-11 18:45:00,2015-01-12T00:45:00.000+0000,2015-01-11T22:38:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,SUNDAY,AA,N3GCAA,ORD,RSW,1,31,46,166,7,41.995,-87.9336,201.8,FM-16,KORD,"180,5,N,0077,5","01676,5,M,N","002012,5,N,5",-65,-615,999999,"08,5,+01676,5,99,9",01000021,,180,5,N,77,5,1676,5,M,N,2012,5,N,5,-6,5,-61,5,99999,9,1.0,0.0,2.0,1,,,,,,,8.0,5.0,1676.0,5.0,99.0,9.0
AA-1008-2015-01-30 18:45:00,2015-01-31T00:45:00.000+0000,2015-01-30T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,FRIDAY,AA,N3DSAA,ORD,RSW,0,0,0,166,7,41.995,-87.9336,201.8,FM-15,KORD,"280,5,N,0015,5","22000,5,9,N","016093,5,N,5",-225,-1285,103165,"02,5,+01067,5,99,9",01000095,,280,5,N,15,5,22000,5,9,N,16093,5,N,5,-22,5,-128,5,10316,5,1.0,0.0,9.0,5,,,,,,,2.0,5.0,1067.0,5.0,99.0,9.0
AA-1008-2015-02-12 18:45:00,2015-02-13T00:45:00.000+0000,2015-02-12T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",FEB,THURSDAY,AA,N3JXAA,ORD,RSW,0,0,0,166,7,41.995,-87.9336,201.8,FM-15,KORD,"310,5,N,0057,5","22000,5,9,N","016093,5,N,5",-895,-1835,103185,"02,5,+01067,5,99,9",,,310,5,N,57,5,22000,5,9,N,16093,5,N,5,-89,5,-183,5,10318,5,,,,,,,,,,,2.0,5.0,1067.0,5.0,99.0,9.0
AA-1008-2015-02-16 18:45:00,2015-02-17T00:45:00.000+0000,2015-02-16T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",FEB,MONDAY,AA,N3HGAA,ORD,RSW,0,10,23,166,7,41.995,-87.9336,201.8,FM-15,KORD,"150,5,N,0021,5","07620,5,M,N","016093,5,N,5",-785,-1945,101845,"04,5,+06401,5,99,9",01000095,,150,5,N,21,5,7620,5,M,N,16093,5,N,5,-78,5,-194,5,10184,5,1.0,0.0,9.0,5,,,,,,,4.0,5.0,6401.0,5.0,99.0,9.0
AA-1008-2015-02-24 18:45:00,2015-02-25T00:45:00.000+0000,2015-02-24T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",FEB,TUESDAY,AA,N3ATAA,ORD,RSW,1,73,61,166,7,41.995,-87.9336,201.8,FM-15,KORD,"250,5,N,0082,5","22000,5,9,N","016093,5,N,5",-225,-785,100645,"02,5,+00792,5,99,9",01000095,,250,5,N,82,5,22000,5,9,N,16093,5,N,5,-22,5,-78,5,10064,5,1.0,0.0,9.0,5,,,,,,,2.0,5.0,792.0,5.0,99.0,9.0
AA-1008-2015-03-06 18:45:00,2015-03-07T00:45:00.000+0000,2015-03-06T21:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",MAR,FRIDAY,AA,N3GGAA,ORD,RSW,1,29,22,166,7,41.995,-87.9336,201.8,FM-15,KORD,"210,5,N,0088,5","22000,5,9,N","016093,5,N,5",-335,-1005,102385,"02,5,+07620,5,99,9",01000095,,210,5,N,88,5,22000,5,9,N,16093,5,N,5,-33,5,-100,5,10238,5,1.0,0.0,9.0,5,,,,,,,2.0,5.0,7620.0,5.0,99.0,9.0
AA-1008-2015-03-15 18:45:00,2015-03-15T23:45:00.000+0000,2015-03-15T20:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",MAR,SUNDAY,AA,N3LBAA,ORD,RSW,1,17,1,166,7,41.995,-87.9336,201.8,FM-15,KORD,"200,5,N,0082,5","07620,5,M,N","016093,5,N,5",1565,335,101885,"04,5,+06096,5,99,9",01000095,,200,5,N,82,5,7620,5,M,N,16093,5,N,5,156,5,33,5,10188,5,1.0,0.0,9.0,5,,,,,,,4.0,5.0,6096.0,5.0,99.0,9.0
AA-1009-2015-01-10 20:25:00,2015-01-11T02:25:00.000+0000,2015-01-10T23:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,SATURDAY,AA,N3JCAA,ORD,LAX,0,0,0,272,7,41.995,-87.9336,201.8,FM-15,KORD,"180,5,N,0077,5","01981,5,M,N","016093,5,N,5",-725,-1285,102565,"08,5,+01981,5,99,9",01000095,109500102099.0,180,5,N,77,5,1981,5,M,N,16093,5,N,5,-72,5,-128,5,10256,5,1.0,0.0,9.0,5,10.0,9.0,5.0,1020.0,9.0,9.0,8.0,5.0,1981.0,5.0,99.0,9.0
AA-1009-2015-01-28 20:25:00,2015-01-29T02:25:00.000+0000,2015-01-28T23:51:00.000+0000,Evening,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",JAN,WEDNESDAY,AA,N3DXAA,ORD,LAX,0,0,0,272,7,41.995,-87.9336,201.8,FM-15,KORD,"150,5,N,0072,5","07010,5,M,N","016093,5,N,5",-65,-615,101655,"02,5,+05182,5,99,9",01000095,39500025099.0,150,5,N,72,5,7010,5,M,N,16093,5,N,5,-6,5,-61,5,10165,5,1.0,0.0,9.0,5,3.0,9.0,5.0,250.0,9.0,9.0,2.0,5.0,5182.0,5.0,99.0,9.0


In [0]:
join_eda.count()

In [0]:
display(join_eda.groupBy("UNIQUE_ID").count())

UNIQUE_ID,count
AA-1027-2015-06-18 09:45:00,3
AA-1081-2015-05-25 09:10:00,1
AA-111-2015-02-16 17:20:00,1
AA-1116-2015-01-25 16:40:00,3
AA-114-2015-04-20 07:35:00,3
AA-1141-2015-04-06 10:40:00,1
AA-1151-2015-06-07 17:15:00,2
AA-1238-2015-02-10 05:45:00,1
AA-1239-2015-05-22 10:40:00,1
AA-130-2015-05-12 12:40:00,1


##Delta Lake Optimization

In [0]:
display(spark.sql("DROP TABLE  IF EXISTS join_eda"))
 
display(spark.sql("CREATE TABLE join_eda USING DELTA LOCATION '" + DELTALAKE_DATA_PATH + "'"))
                  
display(spark.sql("OPTIMIZE join_eda ZORDER BY (ORIGIN)"))

display(spark.sql("OPTIMIZE delta.DELTALAKE_DATA_PATH ZORDER BY (ORIGIN)"))


path,metrics
wasbs://tm30container@w261tm30.blob.core.windows.net/2022-03-18_data_chkpt,"List(51, 57, List(246526469, 438668980, 3.381357205686275E8, 51, 17244921749), List(199613072, 290089734, 2.4248920322807017E8, 57, 13821884584), 0, List(minCubeSize(107374182400), List(0, 0), List(57, 13821884584), 0, List(57, 13821884584), 1, null), 1, 57, 0, false)"


In [0]:
display(join_eda.tail(10))

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,STATION,NAME,MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,GA1,GE1,GF1,MA1,REM,GD1
AS-678-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N586AS,SEA,GEG,1.0,31.0,65.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
AS-946-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N281AK,SEA,PDX,0.0,0.0,57.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
AS-1248-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N524VA,SEA,LAX,1.0,3.0,168.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
AS-678-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N586AS,SEA,GEG,1.0,31.0,65.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
AS-946-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N281AK,SEA,PDX,0.0,0.0,57.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
AS-1248-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:53:00.000+0000,72793024233,"SEATTLE TACOMA INTERNATIONAL AIRPORT, WA US",DEC,TUESDAY,N524VA,SEA,LAX,1.0,3.0,168.0,7,47.4444,-122.3138,112.8,FM-15,KSEA,V020,"190,5,N,0082,5","00396,5,M,N","014484,5,N,5",1005,785,100815,"02,5,+00244,5,99,9","9,MSL ,+99999,+99999",99999999999002441999999,100755099185,MET14712/31/19 15:53:02 METAR KSEA 312353Z 19016G23KT 9SM -RA FEW008 OVC013 10/08 A2975 RMK AO2 RAB05 SLP081 P0001 60012 T01000078 10100 20094 56032 (RS),"1,99,1,+00244,5,9"
NK-920-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:56:00.000+0000,72386023169,"MCCARRAN INTERNATIONAL AIRPORT, NV US",DEC,TUESDAY,N923NK,LAS,ATL,1.0,20.0,230.0,7,36.0719,-115.1634,664.5,FM-15,KLAS,V020,"060,5,N,0041,5","22000,5,9,N","016093,5,N,5",1225,-505,101975,"02,5,+04572,5,99,9","9,AGL ,+99999,+99999",02995999999045721999999,102005094225,MET12612/31/19 15:56:02 METAR KLAS 312356Z 06008KT 10SM FEW150 FEW250 12/M05 A3012 RMK AO2 SLP197 T01221050 10133 20100 55002 $ (GW),"1,99,1,+04572,5,9"
NK-640-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:56:00.000+0000,72386023169,"MCCARRAN INTERNATIONAL AIRPORT, NV US",DEC,TUESDAY,N523NK,LAS,BOS,0.0,25.0,289.0,7,36.0719,-115.1634,664.5,FM-15,KLAS,V020,"060,5,N,0041,5","22000,5,9,N","016093,5,N,5",1225,-505,101975,"02,5,+04572,5,99,9","9,AGL ,+99999,+99999",02995999999045721999999,102005094225,MET12612/31/19 15:56:02 METAR KLAS 312356Z 06008KT 10SM FEW150 FEW250 12/M05 A3012 RMK AO2 SLP197 T01221050 10133 20100 55002 $ (GW),"1,99,1,+04572,5,9"
NK-920-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:56:00.000+0000,72386023169,"MCCARRAN INTERNATIONAL AIRPORT, NV US",DEC,TUESDAY,N923NK,LAS,ATL,1.0,20.0,230.0,7,36.0719,-115.1634,664.5,FM-15,KLAS,V020,"060,5,N,0041,5","22000,5,9,N","016093,5,N,5",1225,-505,101975,"02,5,+04572,5,99,9","9,AGL ,+99999,+99999",02995999999045721999999,102005094225,MET12612/31/19 15:56:02 METAR KLAS 312356Z 06008KT 10SM FEW150 FEW250 12/M05 A3012 RMK AO2 SLP197 T01221050 10133 20100 55002 $ (GW),"1,99,1,+04572,5,9"
NK-640-2019-12-31 21:50:00,2020-01-01T05:50:00.000+0000,2019-12-31T23:56:00.000+0000,72386023169,"MCCARRAN INTERNATIONAL AIRPORT, NV US",DEC,TUESDAY,N523NK,LAS,BOS,0.0,25.0,289.0,7,36.0719,-115.1634,664.5,FM-15,KLAS,V020,"060,5,N,0041,5","22000,5,9,N","016093,5,N,5",1225,-505,101975,"02,5,+04572,5,99,9","9,AGL ,+99999,+99999",02995999999045721999999,102005094225,MET12612/31/19 15:56:02 METAR KLAS 312356Z 06008KT 10SM FEW150 FEW250 12/M05 A3012 RMK AO2 SLP197 T01221050 10133 20100 55002 $ (GW),"1,99,1,+04572,5,9"


In [0]:
join_eda.printSchema()

In [0]:
spark.sql("DROP TABLE IF EXISTS join_eda")
spark.sql("CREATE TABLE join_eda USING DELTA LOCATION '" + DELTALAKE_DATA_PATH + "'")

In [0]:
%sql
DESCRIBE HISTORY join_eda

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2022-03-17T23:16:09.000+0000,7726907417543951,mattpribadi@berkeley.edu,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [""FLIGHT_UTC_DATE""], batchId -> 0, auto -> false)",,List(1038316885895058),0304-231751-s0j8rnoe,0.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 159, numRemovedBytes -> 15249739827, p25FileSize -> 231923216, minFileSize -> 190020687, numAddedFiles -> 56, maxFileSize -> 288737064, p75FileSize -> 264056489, p50FileSize -> 252231740, numAddedBytes -> 13952206835)",
0,2022-03-17T15:55:20.000+0000,2739743832376146,kmuzila@berkeley.edu,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1038316885895058),0304-231751-s0j8rnoe,,WriteSerializable,False,"Map(numFiles -> 159, numOutputBytes -> 15249739827, numOutputRows -> 437868330)",


In [0]:
display(join_eda.head(10))

UNIQUE_ID,FLIGHT_UTC_DATE,WEATHER_UTC_DATE,STATION,NAME,MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,DEP_DEL15,ARR_DELAY_NEW,CRS_ELAPSED_TIME,SOURCE,LATITUDE,LONGITUDE,ELEVATION,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,GA1,GE1,GF1,MA1,REM,GD1
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T05:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"999,9,C,0000,5","07315,5,M,N","016093,5,N,5",-115,-835,102555,"07,5,+07315,5,99,9","9,AGL ,+99999,+99999",99999999999073151999999,102575102335,MET11902/01/15 00:52:02 METAR KDCA 010552Z 00000KT 10SM BKN240 M01/M08 A3029 RMK AO2 SLP255 T10111083 10006 21022 50001 (BGS),"3,99,1,+07315,5,9"
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T06:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"999,9,C,0000,5","07315,5,M,N","016093,5,N,5",-225,-835,102565,"07,5,+07315,5,99,9","9,AGL ,+99999,+99999",99999999999073151999999,102575102335,MET10102/01/15 01:52:02 METAR KDCA 010652Z 00000KT 10SM BKN240 M02/M08 A3029 RMK AO2 SLP256 T10221083 (BGS),"3,99,1,+07315,5,9"
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T07:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"180,5,N,0021,5","04572,5,M,N","016093,5,N,5",-285,-725,102565,"08,5,+04572,5,99,9","9,AGL ,+99999,+99999",99999999999045721999999,102575102335,MET10302/01/15 02:52:02 METAR KDCA 010752Z 18004KT 10SM OVC150 M03/M07 A3029 RMK AO2 SLP256 T10281072 $ (BGS),"4,99,1,+04572,5,9"
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T08:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"210,5,N,0015,5","04877,5,M,N","016093,5,N,5",-285,-785,102555,"08,5,+04877,5,99,9","9,AGL ,+99999,+99999",99999999999048771999999,102575102335,MET10902/01/15 03:52:02 METAR KDCA 010852Z 21003KT 10SM OVC160 M03/M08 A3029 RMK AO2 SLP255 T10281078 50001 $ (BGS),"4,99,1,+04877,5,9"
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T09:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"190,5,N,0026,5","03048,5,M,N","016093,5,N,5",-175,-675,102515,"02,5,+01524,5,99,9","9,AGL ,+99999,+99999",99999999999015241999999,102545102305,MET11002/01/15 04:52:02 METAR KDCA 010952Z 19005KT 10SM FEW050 OVC100 M02/M07 A3028 RMK AO2 SLP251 T10171067 $ (BGS),"1,99,1,+01524,5,9"
AA-913-2015-02-01 06:00:00,2015-02-01T11:00:00.000+0000,2015-02-01T10:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,SUNDAY,N3DXAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"220,5,N,0015,5","02438,5,M,N","016093,5,N,5",-175,-725,102555,"08,5,+02438,5,99,9","9,AGL ,+99999,+99999",99999999999024381999999,102575102335,MET10302/01/15 05:52:02 METAR KDCA 011052Z 22003KT 10SM OVC080 M02/M07 A3029 RMK AO2 SLP255 T10171072 $ (BGS),"4,99,1,+02438,5,9"
AA-913-2015-02-03 06:00:00,2015-02-03T11:00:00.000+0000,2015-02-03T05:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,TUESDAY,N3HRAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"320,5,N,0072,5","22000,5,9,N","016093,5,N,5",-285,-1445,102115,"00,5,+99999,9,99,9",,00991999999999999999999,102135101895,MET11902/03/15 00:52:02 METAR KDCA 030552Z 32014G24KT 10SM CLR M03/M14 A3016 RMK AO2 SLP211 T10281144 11011 21028 51026 (JDH),"0,99,1,+99999,9,9"
AA-913-2015-02-03 06:00:00,2015-02-03T11:00:00.000+0000,2015-02-03T06:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,TUESDAY,N3HRAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"320,5,N,0041,5","22000,5,9,N","016093,5,N,5",-395,-1395,102175,"00,5,+99999,9,99,9",,00991999999999999999999,102205101965,MET09802/03/15 01:52:02 METAR KDCA 030652Z 32008KT 10SM CLR M04/M14 A3018 RMK AO2 SLP217 T10391139 (JDH),"0,99,1,+99999,9,9"
AA-913-2015-02-03 06:00:00,2015-02-03T11:00:00.000+0000,2015-02-03T07:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,TUESDAY,N3HRAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"330,5,N,0067,5","22000,5,9,N","016093,5,N,5",-395,-1395,102255,"00,5,+99999,9,99,9",,00991999999999999999999,102275102035,MET10102/03/15 02:52:02 METAR KDCA 030752Z 33013G18KT 10SM CLR M04/M14 A3020 RMK AO2 SLP225 T10391139 (JDH),"0,99,1,+99999,9,9"
AA-913-2015-02-03 06:00:00,2015-02-03T11:00:00.000+0000,2015-02-03T08:52:00.000+0000,72405013743,"WASHINGTON REAGAN NATIONAL AIRPORT, VA US",FEB,TUESDAY,N3HRAA,DCA,MIA,0.0,0.0,163.0,7,38.8472,-77.03454,3.0,FM-15,KDCA,V030,"320,5,N,0046,5","22000,5,9,N","016093,5,N,5",-395,-1445,102305,"00,5,+99999,9,99,9",,00991999999999999999999,102305102065,MET10702/03/15 03:52:02 METAR KDCA 030852Z 32009G17KT 10SM CLR M04/M14 A3021 RMK AO2 SLP230 T10391144 51019 (JDH),"0,99,1,+99999,9,9"


#Data Dictionaries
 - https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf

##Resources
 | Description | Link |
 | --- | ---|
 | Microsoft Delta Information | https://docs.microsoft.com/en-us/azure/databricks/delta/optimizations/delta-cache |
 | Delta Tables | https://hevodata.com/learn/databricks-delta-tables/| 
 | Delta Lake Quickstart | https://docs.databricks.com/delta/quick-start.html |
 | Weather affecting travel article | https://www.transportation.gov/sites/dot.gov/files/docs/kulesa_Weather_Aviation.pdf |
 | FAA Inclement Weather Guide on travel | https://www.faa.gov/newsroom/inclement-weather-0?newsId=23074 |
 | EDA Guidelines | https://medium.com/swlh/guide-to-exploratory-data-analysis-for-data-science-294baff8b741|
 | Datetime Functions | https://sparkbyexamples.com/pyspark/pyspark-timestamp-difference-seconds-minutes-hours/ |
 | High Performance Queries | https://docs.databricks.com/_static/notebooks/delta/optimize-python.html |