In [0]:
# load in raw flights and weather data

from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext

import numpy as np
import datetime
from pyspark.sql.types import TimestampType
import us
import pytz

sqlContext = SQLContext(sc)

airlines = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/201*.parquet")

weather = spark.read.option("header", "true")\
                    .parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/weather_data/*.parquet")

In [0]:
# get only flights departing from ORD and ATL in Q1 of 2015

filterAirports = ['ORD', 'ATL']
filterYear = 2015
filterQuarter = 1

def convert_localToUTC(yearStr, monthStr, dayOfMonthStr, timeInt, USStateAbbrev):
  year, month, dayOfMonth = (int(yearStr), int(monthStr), int(dayOfMonthStr))
  
  # get hours and minutes depending on length of the DEP_TIME as a string
  timeStr = str(timeInt)
  if len(timeStr) == 3:
    hour, minutes = (int(timeStr[0]), int(timeStr[1:]))
  if len(timeStr) == 4:
    hour, minutes = (int(timeStr[0:2]), int(timeStr[2:]))
  
  localDatetime = datetime.datetime(year, month, dayOfMonth, hour, minutes)
  
  localTimezoneStr = us.states.lookup(USStateAbbrev).time_zones[0]
  pytzObj = pytz.timezone(localTimezoneStr)
  
  return pytzObj.localize(localDatetime).astimezone(pytz.utc)

def floorAndSubtract_hours(inputDatetime, hoursToSubtract):
  flooredToHour = inputDatetime - datetime.timedelta(minutes=inputDatetime.minute)
  
  return flooredToHour - datetime.timedelta(hours=hoursToSubtract)

udf_convert_localToUTC = f.udf(convert_localToUTC, TimestampType())
udf_subtract_hours = f.udf(floorAndSubtract_hours, TimestampType())

airlines_phase1 = airlines.filter(((f.col('ORIGIN')==filterAirports[0]) | (f.col('ORIGIN')==filterAirports[1])) & (f.col('YEAR')==filterYear) & (f.col('QUARTER')==filterQuarter)) \
                          .withColumn('CRS_DEP_DATETIME_UTC', udf_convert_localToUTC('YEAR', 'MONTH', 'DAY_OF_MONTH', 'CRS_DEP_TIME', 'ORIGIN_STATE_ABR')) \
                          .withColumn('CRS_ARR_DATETIME_UTC', udf_convert_localToUTC('YEAR', 'MONTH', 'DAY_OF_MONTH', 'CRS_DEP_TIME', 'ORIGIN_STATE_ABR')) \
                          .withColumn('datetime_2hoursBeforeDEP_floored', udf_subtract_hours('CRS_DEP_DATETIME_UTC', f.lit(2))) \
                          .withColumn('datetime_3hoursBeforeDEP_floored', udf_subtract_hours('CRS_DEP_DATETIME_UTC', f.lit(3)))

In [0]:
# get all weather data for only stations specificed in 'weatherStationsNames_phase1' variable below

weatherStationNames_phase1 = ['CHICAGO OHARE INTERNATIONAL AIRPORT, IL US', 'ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US']
colsToJoin = ['STATION', 'DATE', 'SOURCE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 'REPORT_TYPE'
             , 'CALL_SIGN', 'QUALITY_CONTROL', 'WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP', 'AA1', 'AA2', 'AJ1', 'GD1', 'HL1'
             , 'USState_forJoin', 'weather_datetime_roundedToHour'
             ]

def get_USState_forJoin(stationName):
  return stationName.split(', ')[1][0:2]

def round_toHour(inputDatetime):
  if inputDatetime.minute < 30:
    roundedToHour = inputDatetime - datetime.timedelta(minutes=inputDatetime.minute)
    
  elif inputDatetime.minute >= 30:
    roundedToHour = inputDatetime + datetime.timedelta(minutes=(60 - inputDatetime.minute))
  
  else:
    roundedToHour = None
  
  return roundedToHour

udf_getState = f.udf(get_USState_forJoin, StringType())
udf_round_toHour = f.udf(round_toHour, TimestampType())

weather_phase1 = weather.filter(weather.NAME.isin(weatherStationNames_phase1)) \
                        .withColumn('USState_forJoin', udf_getState('NAME')) \
                        .withColumn('weather_datetime_roundedToHour', udf_round_toHour('DATE')) \
                        .select(colsToJoin)

In [0]:
# join the flight data with weather data
from pyspark.sql.types import FloatType

colsToUse = ['YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'TAIL_NUM', 'ORIGIN', 'DEST'
             , 'CRS_DEP_DATETIME_UTC', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'CRS_ARR_TIME'
             , 'CRS_ELAPSED_TIME', 'DISTANCE'
             , 'NAME', 'STATION', 'SOURCE', 'REPORT_TYPE', 'QUALITY_CONTROL', 'WND'
             , 'WND_angle', 'WND_angleQuality', 'WND_type', 'WND_speed', 'WND_speedQuality'
             , 'CIG_height', 'CIG_heightQuality', 'CIG_type', 'CIG_CAVOK'
             , 'VIS_distance', 'VIS_quality'
             , 'TMP_airTemp', 'TMP_quality'
             , 'DEW_dewPointTemp', 'DEW_quality'
             , 'SLP_airPressure', 'SLP_quality'
             , 'PRECIP_duration', 'PRECIP_amount', 'PRECIP_conditionCode', 'PRECIP_quality'
             , 'SNOW_depth', 'SNOW_conditionCode', 'SNOW_quality', 'SNOW_liquidDepth', 'SNOW_liquidCondition', 'SNOW_liquidQuality'
             , 'DATE', 'weather_datetime_roundedToHour'
            ]

joinConds = [(airlines_phase1.datetime_2hoursBeforeDEP_floored == weather_phase1.weather_datetime_roundedToHour)
             , airlines_phase1.ORIGIN_STATE_ABR==weather_phase1.USState_forJoin]

joinedDf = airlines_phase1.join(weather_phase1, joinConds, 'inner') \
                          .dropDuplicates(subset=['ORIGIN', 'CRS_DEP_DATETIME_UTC']) \
                          .withColumn('WND_angle', f.split(f.col('WND'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('WND_angleQuality', f.split(f.col('WND'), ',').getItem(1).cast(IntegerType())) \
                          .withColumn('WND_type', f.split(f.col('WND'), ',').getItem(2).cast(StringType())) \
                          .withColumn('WND_speed', f.split(f.col('WND'), ',').getItem(3).cast(FloatType())) \
                          .withColumn('WND_speedQuality', f.split(f.col('WND'), ',').getItem(4).cast(IntegerType())) \
                          .withColumn('CIG_height', f.split(f.col('CIG'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('CIG_heightQuality', f.split(f.col('CIG'), ',').getItem(1).cast(IntegerType())) \
                          .withColumn('CIG_type', f.split(f.col('CIG'), ',').getItem(2).cast(StringType())) \
                          .withColumn('CIG_CAVOK', f.split(f.col('CIG'), ',').getItem(3).cast(StringType())) \
                          .withColumn('VIS_distance', f.split(f.col('VIS'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('VIS_quality', f.split(f.col('VIS'), ',').getItem(1).cast(IntegerType())) \
                          .withColumn('TMP_airTemp', f.split(f.col('TMP'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('TMP_quality', f.split(f.col('TMP'), ',').getItem(1).cast(StringType())) \
                          .withColumn('DEW_dewPointTemp', f.split(f.col('DEW'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('DEW_quality', f.split(f.col('DEW'), ',').getItem(1).cast(StringType())) \
                          .withColumn('SLP_airPressure', f.split(f.col('SLP'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('SLP_quality', f.split(f.col('SLP'), ',').getItem(1).cast(IntegerType())) \
                          .withColumn('PRECIP_duration', f.split(f.col('AA2'), ',').getItem(0).cast(IntegerType())) \
                          .withColumn('PRECIP_amount', f.split(f.col('AA2'), ',').getItem(1).cast(FloatType())) \
                          .withColumn('PRECIP_conditionCode', f.split(f.col('AA2'), ',').getItem(2).cast(StringType())) \
                          .withColumn('PRECIP_quality', f.split(f.col('AA2'), ',').getItem(3).cast(StringType())) \
                          .withColumn('SNOW_depth', f.split(f.col('AJ1'), ',').getItem(0).cast(FloatType())) \
                          .withColumn('SNOW_conditionCode', f.split(f.col('AJ1'), ',').getItem(1).cast(StringType())) \
                          .withColumn('SNOW_quality', f.split(f.col('AJ1'), ',').getItem(2).cast(StringType())) \
                          .withColumn('SNOW_liquidDepth', f.split(f.col('AJ1'), ',').getItem(3).cast(FloatType())) \
                          .withColumn('SNOW_liquidCondition', f.split(f.col('AJ1'), ',').getItem(4).cast(StringType())) \
                          .withColumn('SNOW_liquidQuality', f.split(f.col('AJ1'), ',').getItem(5).cast(StringType())) \
                          .select(colsToUse)

display(joinedDf.sample(False, .001))

YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,TAIL_NUM,ORIGIN,DEST,CRS_DEP_DATETIME_UTC,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,CRS_ELAPSED_TIME,DISTANCE,NAME,STATION,SOURCE,REPORT_TYPE,QUALITY_CONTROL,WND,WND_angle,WND_angleQuality,WND_type,WND_speed,WND_speedQuality,CIG_height,CIG_heightQuality,CIG_type,CIG_CAVOK,VIS_distance,VIS_quality,TMP_airTemp,TMP_quality,DEW_dewPointTemp,DEW_quality,SLP_airPressure,SLP_quality,PRECIP_duration,PRECIP_amount,PRECIP_conditionCode,PRECIP_quality,SNOW_depth,SNOW_conditionCode,SNOW_quality,SNOW_liquidDepth,SNOW_liquidCondition,SNOW_liquidQuality,DATE,weather_datetime_roundedToHour
2015,1,16,5,N381DN,ATL,SLC,2015-01-16T19:00:00.000+0000,13.0,1411.0,1602.0,4.0,1618,258.0,1590.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"310,5,N,0041,5",310.0,5,N,41.0,5,22000.0,5,9,N,16093.0,5.0,72.0,5,-11.0,5,10239.0,5,,,,,,,,,,,2015-01-16T16:52:00.000+0000,2015-01-16T17:00:00.000+0000
2015,2,10,2,N623NK,ATL,IAH,2015-02-10T20:46:00.000+0000,10.0,1608.0,1646.0,7.0,1706,140.0,689.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"320,5,V,0041,5",320.0,5,V,41.0,5,22000.0,5,9,N,16093.0,5.0,100.0,5,22.0,5,10140.0,5,,,,,,,,,,,2015-02-10T17:52:00.000+0000,2015-02-10T18:00:00.000+0000
2015,2,11,3,N852AS,ATL,BQK,2015-02-11T16:08:00.000+0000,25.0,1123.0,1207.0,7.0,1219,71.0,238.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"090,5,N,0036,5",90.0,5,N,36.0,5,22000.0,5,9,N,16093.0,5.0,50.0,5,0.0,5,10188.0,5,,,,,,,,,,,2015-02-11T13:52:00.000+0000,2015-02-11T14:00:00.000+0000
2015,2,14,6,N944DN,ATL,DAB,2015-02-14T21:26:00.000+0000,16.0,1640.0,1728.0,4.0,1745,79.0,366.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"250,5,N,0072,5",250.0,5,N,72.0,5,22000.0,5,9,N,16093.0,5.0,139.0,5,-50.0,5,10161.0,5,,,,,,,,,,,2015-02-14T18:52:00.000+0000,2015-02-14T19:00:00.000+0000
2015,2,18,3,N664DN,ATL,BOS,2015-02-18T17:51:00.000+0000,10.0,1300.0,1450.0,11.0,1524,153.0,946.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"260,5,N,0072,5",260.0,5,N,72.0,5,1372.0,5,M,N,16093.0,5.0,-6.0,5,-67.0,5,10179.0,5,,,,,,,,,,,2015-02-18T14:52:00.000+0000,2015-02-18T15:00:00.000+0000
2015,3,29,7,N929DL,ATL,BWI,2015-03-29T21:30:00.000+0000,19.0,1749.0,1913.0,5.0,1926,116.0,577.0,"ATLANTA HARTSFIELD INTERNATIONAL AIRPORT, GA US",72219013874,7,FM-15,V030,"200,5,N,0046,5",200.0,5,N,46.0,5,22000.0,5,9,N,16093.0,5.0,139.0,5,-89.0,5,10273.0,5,,,,,,,,,,,2015-03-29T18:52:00.000+0000,2015-03-29T19:00:00.000+0000
2015,2,5,4,N12221,ORD,EWR,2015-02-05T13:54:00.000+0000,16.0,814.0,1044.0,10.0,1102,128.0,719.0,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",72530094846,7,FM-15,V030,"310,5,N,0026,5",310.0,5,N,26.0,5,22000.0,5,9,N,16093.0,5.0,-183.0,5,-217.0,5,10317.0,5,,,,,,,,,,,2015-02-05T10:51:00.000+0000,2015-02-05T11:00:00.000+0000
2015,3,1,7,N498UA,ORD,TPA,2015-03-01T15:28:00.000+0000,13.0,1023.0,1326.0,3.0,1309,161.0,1012.0,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",72530094846,7,FM-16,V030,"999,9,C,0000,5",999.0,9,C,0.0,5,152.0,5,M,N,4023.0,5.0,-90.0,5,-100.0,5,99999.0,9,,,,,,,,,,,2015-03-01T12:49:00.000+0000,2015-03-01T13:00:00.000+0000
2015,3,17,2,N510NK,ORD,BWI,2015-03-18T02:04:00.000+0000,14.0,2133.0,5.0,6.0,2352,108.0,622.0,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",72530094846,7,FM-15,V030,"080,5,N,0031,5",80.0,5,N,31.0,5,7620.0,5,M,N,16093.0,5.0,33.0,5,-111.0,5,10270.0,5,,,,,,,,,,,2015-03-17T23:51:00.000+0000,2015-03-18T00:00:00.000+0000
2015,3,31,2,N634MQ,ORD,CVG,2015-03-31T17:59:00.000+0000,19.0,1315.0,1503.0,6.0,1515,76.0,265.0,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",72530094846,7,FM-16,V030,"340,5,N,0051,5",340.0,5,N,51.0,5,457.0,5,M,N,16093.0,5.0,67.0,5,28.0,5,99999.0,9,,,,,,,,,,,2015-03-31T14:31:00.000+0000,2015-03-31T15:00:00.000+0000


In [0]:
# see how much of the flight data is left after joining
joinedDf.count()

In [0]:
# check that joined times of flight and weather data are correct
joinedDf.select(['ORIGIN', 'YEAR', 'MONTH', 'DAY_OF_MONTH', 'CRS_DEP_DATETIME_UTC', 'NAME', 'DATE', 'weather_datetime_roundedToHour']) \
         .sample(False, .001).show()