<a href="https://colab.research.google.com/github/kchen79427/Capstone_Bruestle_Boudia_Chen/blob/dave_branch/Weather_Boudia_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Activation, Dropout, Input, Reshape, Lambda, RepeatVector
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import udf, expr, concat_ws
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType
import json
from datetime import datetime

In [2]:
spark =  SparkSession.builder.master("local[*]") \
  .appName('Takeoff') \
  .getOrCreate()

In [3]:
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True)
spark.conf.set("spark.sql.shuffle.partitions",500)

# Helper Functions

In [4]:
# load json object for referencing filepaths
files = json.load(open('drive/Shareddrives/STUDENT-Capstone SS23/files.json','r'))

In [5]:
def createFlightDT(fdate, ftime):
  # Creates a date-time timestamp
  lst = [x for x in str(ftime)]
  if len(lst) == 3:
    lst.insert(0,'0')
  lst.insert(2,':') # insert a column to delineate hour from min
  fdt = fdate + ' ' + "".join(lst) + ':00'
  return fdt

# Load Weather Training Data

In [6]:
# Load Weather data from parquet into spark DataFrame
weather = spark.read.parquet(files['Weather']['DFW_train'])

In [None]:
weather = weather.withColumn('WeatherDtTm1', to_timestamp('WeatherDtTm'))

In [51]:
# If getting DFW Weather Specifically
weather.createOrReplaceTempView('weather')

In [25]:
weather.columns

['Origin',
 'FlightDate',
 'CRSDepTime',
 'OriginDtTm',
 'WeatherDtTm',
 'w_dir_angle',
 'w_type',
 'w_speed_rate',
 'sky_c_hgt',
 'sky_c_det',
 'sky_c_cavok',
 'vis_dist',
 'vis_var',
 'tmp_air',
 'tmp_dew',
 'sea_lvl_p',
 'liq_precip_qty',
 'liq_precip_dim',
 'liq_precip_cond',
 'sky_cov',
 'sky_cov_base_hgt',
 'sky_cov_cld',
 'sky_sum_cov',
 'sky_sum_hgt',
 'sky_obs_tot_cov',
 'sky_low_cld_base_hgt',
 'at_pres_altimeter_rate',
 'at_pres_stn_rate',
 '__index_level_0__']

In [24]:
w = spark.sql("Select * FROM All_Weather WHERE DATE(WeatherDtTm) = '2014-08-21'")

In [27]:
w.show(10)

+------------+-------------------+-----------+------+------------+---------+---------+-----------+--------+-------+-------+-------+---------+--------------+--------------+---------------+-------+----------------+-----------+-----------+-----------+---------------+--------------------+----------------------+----------------+-----------------+
|airport_code|        WeatherDtTm|w_dir_angle|w_type|w_speed_rate|sky_c_hgt|sky_c_det|sky_c_cavok|vis_dist|vis_var|tmp_air|tmp_dew|sea_lvl_p|liq_precip_qty|liq_precip_dim|liq_precip_cond|sky_cov|sky_cov_base_hgt|sky_cov_cld|sky_sum_cov|sky_sum_hgt|sky_obs_tot_cov|sky_low_cld_base_hgt|at_pres_altimeter_rate|at_pres_stn_rate|__index_level_0__|
+------------+-------------------+-----------+------+------------+---------+---------+-----------+--------+-------+-------+-------+---------+--------------+--------------+---------------+-------+----------------+-----------+-----------+-----------+---------------+--------------------+----------------------+----

# Load Raw Flight Data

In [8]:
spdf = spark.read.parquet(files['BTS']['DFW_train'])

In [9]:
# Create a Flight Date/Time Timestamp to be able to compare to the Weather Dataset
func = udf(lambda d,t: createFlightDT(d,t))
dfw = spdf.withColumn('Flight_DateTime', to_timestamp(func(spdf.FlightDate, spdf.CRSDepTime)))

In [10]:
# Create a unique Flight ID for each flight for easy reference
dfw = dfw.withColumn('Flight_Num', concat_ws('_',spdf.DOT_ID_Reporting_Airline,
                                           spdf.Flight_Number_Reporting_Airline,
                                           spdf.FlightDate))

In [11]:
dfw = dfw.withColumn('weather_wdw_start', col('Flight_DateTime') - expr('INTERVAL 45 MINUTES'))

In [12]:
dfw.createOrReplaceTempView('Ontime')

In [14]:
dfw_t = spark.sql("""SELECT * FROM Ontime""")
dfw_t.write.parquet('drive/Shareddrives/STUDENT-Capstone SS23/DFW_training_window',mode='overwrite')

In [21]:
dfw_t.createOrReplaceTempView('FData')

# Get Aircraft Data

In [15]:
aircraft = spark.read.csv(files['Aircraft'],header=True)

In [46]:
aircraft.show(1)

+---+-------+------+-------+--------------------+----------+----------+-----------+----------+-----------+
|_c0|tail_no|   mfr|  model|                type|passengers|wght_class|engine_type|no_engines|tail_prefix|
+---+-------+------+-------+--------------------+----------+----------+-----------+----------+-----------+
|  0| N301DQ|BOEING|737-732|Fixed Wing (Multi...|     149.0|   CLASS 3|   Turbofan|       2.0|          N|
+---+-------+------+-------+--------------------+----------+----------+-----------+----------+-----------+
only showing top 1 row



In [16]:
aircraft.createOrReplaceTempView('Aircraft')

# Create Input Data (Flight + Aircraft Join)

In [22]:
final_flight = spark.sql("""

                         SELECT
                          f.*
                         FROM
                          FData f INNER JOIN
                          Aircraft a
                          ON
                          f.Tail_Number = a.tail_no
                        WHERE
                          a.passengers >= 100
                         """
)

In [25]:
# Execute Query and write to parquet file
final_flight.write.parquet('drive/Shareddrives/STUDENT-Capstone SS23/Final_Flight', mode='overwrite')

In [28]:
# Load the file back in
flight = spark.read.parquet('/content/drive/Shareddrives/STUDENT-Capstone SS23/Final_Flight/part-00000-4b1bea59-21e0-4a18-b0a6-c0f437da7b97-c000.snappy.parquet')

In [31]:
flight.createOrReplaceTempView('Flights')

# Combine Flight and Weather Data

In [70]:
data = spark.sql("""
            Select
              f.Flight_Num,
              f.Flight_DateTime,
              w.WeatherDtTm,
              f.WeatherDelay,
              w.w_dir_angle,
              w.w_type,
              w.w_speed_rate,
              w.sky_c_hgt,
              w.sky_c_det,
              w.sky_c_cavok,
              w.vis_dist,
              w.vis_var,
              w.tmp_air,
              w.tmp_dew,
              w.sea_lvl_p,
              w.liq_precip_qty,
              w.liq_precip_dim,
              w.liq_precip_cond,
              w.sky_cov,
              w.sky_cov_base_hgt,
              w.sky_cov_cld,
              w.sky_sum_cov,
              w.sky_sum_hgt,
              w.sky_obs_tot_cov,
              w.sky_low_cld_base_hgt,
              w.at_pres_altimeter_rate,
              w.at_pres_stn_rate
            FROM
              Flights f,
              weather w
            WHERE
              w.WeatherDtTm1 Between f.weather_wdw_start AND  f.Flight_DateTime AND
              f.DepTime <> ''
              """
)



In [None]:
data.write.parquet('/content/drive/Shareddrives/STUDENT-Capstone SS23/Model_Data', mode='overwrite')

In [58]:
df = data.toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [69]:
df.sort_values('WeatherDtTm').head(100)

Unnamed: 0,Flight_Num,Flight_DateTime,WeatherDtTm,WeatherDelay,w_dir_angle,w_type,w_speed_rate,sky_c_hgt,sky_c_det,sky_c_cavok,...,liq_precip_cond,sky_cov,sky_cov_base_hgt,sky_cov_cld,sky_sum_cov,sky_sum_hgt,sky_obs_tot_cov,sky_low_cld_base_hgt,at_pres_altimeter_rate,at_pres_stn_rate
0,19805_1322_2010-01-01,2010-01-01 06:40:00,2010-01-01 05:59:00,0.0,999,9,9999,99999,9,9,...,9,99,99999,99,9,99999,99,99999,99999,99999
1,19805_1322_2010-01-01,2010-01-01 06:40:00,2010-01-01 06:00:00,0.0,340,N,62,22000,9,N,...,2,00,99999,99,9,99999,00,99999,99999,10094
2,19805_1776_2010-01-01,2010-01-01 06:45:00,2010-01-01 06:00:00,,340,N,62,22000,9,N,...,2,00,99999,99,9,99999,00,99999,99999,10094
3,19805_704_2010-01-01,2010-01-01 07:15:00,2010-01-01 06:53:00,,340,N,57,22000,9,N,...,9,99,99999,99,9,99999,00,99999,10305,10085
4,19805_1143_2010-01-01,2010-01-01 08:30:00,2010-01-01 07:53:00,0.0,340,N,36,22000,9,N,...,9,99,99999,99,9,99999,00,99999,10308,10088
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
87,19805_629_2010-01-01,2010-01-01 18:30:00,2010-01-01 17:53:00,,999,V,15,22000,9,N,...,9,99,99999,99,9,99999,00,99999,10315,10095
86,19805_440_2010-01-01,2010-01-01 18:00:00,2010-01-01 17:53:00,,999,V,15,22000,9,N,...,9,99,99999,99,9,99999,00,99999,10315,10095
91,19805_1286_2010-01-01,2010-01-01 18:30:00,2010-01-01 17:53:00,,999,V,15,22000,9,N,...,9,99,99999,99,9,99999,00,99999,10315,10095
107,19805_2368_2010-01-01,2010-01-01 18:10:00,2010-01-01 18:00:00,,999,9,9999,22000,9,N,...,9,00,99999,99,9,99999,00,99999,99999,10104


Below cell is what I'm using to Create the Rank of timestamps once the weather data has been incorporated.

In [42]:
ts = spark.sql("""

          SELECT
            *,
            Dense_Rank() OVER(PARTITION BY Flight_Num ORDER BY WeatherDtTm ASC,__index_level_0__) Time_Sequence
          FROM
            All_Data
          """)

AnalysisException: ignored

In [192]:
dw_g = dw.groupby('Flight_Num')[['T_x']].count()
flights = dw_g[dw_g['T_x'] > 20].index

In [200]:
lst = []
grp = dw[dw['Flight_Num'].isin(flights)].groupby('Flight_Num')
for k,f in grp:
  lst.append(f.head(20))
final = pd.concat(lst)

In [206]:
len(final.Flight_Num.unique()), final.shape

(220, (4400, 7))

In [210]:
nf = final.values.reshape(220,20,7)

In [215]:
nf[0][1]

array(['19805_1004_2014-08-17', Timestamp('2014-08-17 18:53:00'), 1.0,
       2.0, 57, '+0244', 2], dtype=object)

# Combine Weather Data with Flight

Let's first get a date or two that had severe weather delays.

In [13]:
rslts = spark.sql('Select * FROM Weather Limit 1')

In [30]:
rslts.show()

+------+----------+----------+-------------------+-------------------+-----------+----------+------+------------+------------+---------+----------+---------+-----------+--------+-------------+-------+------------+-------+------------+-------+------------+---------+--------------+--------------+--------------+---------------+---------------+-------+------------+----------------+-----------------+-----------+----------------+-----------+-----------+----------------+---------------+--------------------+--------------------+-------------------------+----------------------+----------------+-----------------+
|Origin|FlightDate|CRSDepTime|         OriginDtTm|        WeatherDtTm|w_dir_angle|w_dir_qlty|w_type|w_speed_rate|w_speed_qlty|sky_c_hgt|sky_c_qlty|sky_c_det|sky_c_cavok|vis_dist|vis_dist_qlty|vis_var|vis_var_qlty|tmp_air|tmp_air_qlty|tmp_dew|tmp_dew_qlty|sea_lvl_p|sea_lvl_p_qlty|liq_precip_qty|liq_precip_dim|liq_precip_cond|liq_precip_qlty|sky_cov|sky_cov_qlty|sky_cov_base_hgt|sky_cov_b

# 5 Flights with 5 Time Measurements

In [None]:
#

In [58]:
w = spark.sql("""
              Select
                *
              FROM
                Weather
              WHERE
                FlightDate = '2014-08-17'

              """)

In [19]:
df = w.toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [20]:
df.head(10)

Unnamed: 0,Origin,FlightDate,CRSDepTime,OriginDtTm,WeatherDtTm,w_dir_angle,w_dir_qlty,w_type,w_speed_rate,w_speed_qlty,...,sky_sum_cov,sky_sum_hgt,sky_sum_hgt_qlty,sky_obs_tot_cov,sky_obs_qlty_tot_cov,sky_low_cld_base_hgt,sky_low_cld_base_hgt_qlty,at_pres_altimeter_rate,at_pres_stn_rate,__index_level_0__
0,DFW,2014-08-17,1000,2014-08-17 10:00:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246570
1,DFW,2014-08-17,1000,2014-08-17 10:00:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246571
2,DFW,2014-08-17,1000,2014-08-17 10:00:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246572
3,DFW,2014-08-17,1000,2014-08-17 10:00:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246573
4,DFW,2014-08-17,1005,2014-08-17 10:05:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246574
5,DFW,2014-08-17,1005,2014-08-17 10:05:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246575
6,DFW,2014-08-17,1005,2014-08-17 10:05:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246576
7,DFW,2014-08-17,1010,2014-08-17 10:10:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246577
8,DFW,2014-08-17,1010,2014-08-17 10:10:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246578
9,DFW,2014-08-17,1010,2014-08-17 10:10:00,2014-08-17 09:00:00,100,1,N,26,1,...,9,,99999,6,1,2250,1,99999,9921,246579


In [167]:
flights = spark.sql("""
                      Select
                        *
                      FROM
                        Ontime
                      WHERE
                        Origin='DFW' and
                        FlightDate = '2014-08-17'

                    """)

#   Flight_DateTime,

#   Flight_Number_Reporting_Airline,
#   WeatherDelay,
#   DepDel15,
#   FlightDate
# weather_wdw_start,

In [69]:
flights.show(2)

+-------------+----+---------+---------------+------------------------+-------------+----------+---------------+-------------+--------+-------+---------------+------------------+-----------------+------+----+--------+-------------------------------+---------------+------------+------------------+------------+-----+----------------+----------+---------------+-------+--------+-------+------------------+-------------+----------+---------+------------+--------+--------------+---------+----------------+------+--------------------+--------+----------+-----------------+---------+--------+--------+-------+-------+----------+-----------+----------+---------------+-----------------+
|DestStateFips|Dest|Cancelled|LongestAddGTime|DOT_ID_Reporting_Airline|TotalAddGTime|DepTimeBlk|OriginStateFips|SecurityDelay|ArrDel15|TaxiOut|ArrDelayMinutes|ArrivalDelayGroups|ActualElapsedTime|TaxiIn|Year|NASDelay|Flight_Number_Reporting_Airline|DepDelayMinutes|CarrierDelay|OriginAirportSeqID|FirstDepTime|Month|De

In [24]:
df.head(20)

Unnamed: 0,Flight_Number_Reporting_Airline,WeatherDelay,DepDel15,FlightDate
0,71,146.0,1.0,2014-08-17
1,72,,0.0,2014-08-17
2,73,58.0,1.0,2014-08-17
3,79,31.0,1.0,2014-08-17
4,81,101.0,1.0,2014-08-17
5,88,49.0,1.0,2014-08-17
6,90,120.0,1.0,2014-08-17
7,96,0.0,1.0,2014-08-17
8,138,11.0,1.0,2014-08-17
9,140,,,2014-08-17


In [16]:
train = spark.read.parquet(files['BTS']['DFW_train'])

In [37]:
test = spark.read.parquet(files['BTS']['DFW_test'])

In [23]:
train.show(1)

+-------------+----+---------+---------------+------------------------+-------------+----------+---------------+-------------+--------+-------+---------------+------------------+-----------------+------+----+--------+-------------------------------+---------------+------------+------------------+------------+-----+----------------+----------+---------------+-------+--------+-------+------------------+-------------+----------+---------+------------+--------+--------------+---------+----------------+------+--------------------+--------+----------+-----------------+---------+--------+--------+-------+-------+----------+-----------+
|DestStateFips|Dest|Cancelled|LongestAddGTime|DOT_ID_Reporting_Airline|TotalAddGTime|DepTimeBlk|OriginStateFips|SecurityDelay|ArrDel15|TaxiOut|ArrDelayMinutes|ArrivalDelayGroups|ActualElapsedTime|TaxiIn|Year|NASDelay|Flight_Number_Reporting_Airline|DepDelayMinutes|CarrierDelay|OriginAirportSeqID|FirstDepTime|Month|DestAirportSeqID|CRSDepTime|OriginAirportID|Dep

In [31]:
pd.read_csv(files['Carriers'])

Unnamed: 0,Code,Description
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.
...,...,...
1716,ZW,Air Wisconsin Airlines Corp
1717,ZX,Air Georgian
1718,ZX (1),Airbc Ltd.
1719,ZY,Atlantic Gulf Airlines


In [28]:
md = spark.read.parquet('/content/drive/Shareddrives/STUDENT-Capstone SS23/Model_Data/DFW_All_Flight_Measurements/part-00000-76e08f2d-0866-4947-b3ca-489c635d74c8-c000.snappy.parquet')

In [29]:
md.show(5)

+-------------------+-------------------+-------------------+------------+-----------+------+------------+---------+---------+-----------+--------+-------+-------+-------+---------+--------------+--------------+---------------+-------+----------------+-----------+-----------+-----------+---------------+--------------------+----------------------+----------------+
|         Flight_Num|    Flight_DateTime|        WeatherDtTm|WeatherDelay|w_dir_angle|w_type|w_speed_rate|sky_c_hgt|sky_c_det|sky_c_cavok|vis_dist|vis_var|tmp_air|tmp_dew|sea_lvl_p|liq_precip_qty|liq_precip_dim|liq_precip_cond|sky_cov|sky_cov_base_hgt|sky_cov_cld|sky_sum_cov|sky_sum_hgt|sky_obs_tot_cov|sky_low_cld_base_hgt|at_pres_altimeter_rate|at_pres_stn_rate|
+-------------------+-------------------+-------------------+------------+-----------+------+------------+---------+---------+-----------+--------+-------+-------+-------+---------+--------------+--------------+---------------+-------+----------------+-----------+----