In [None]:
from pyspark.sql import types, functions as F
import pandas as pd
import numpy as np

# Read in the full data

In [None]:
# subset by columns we're keeping
weather_col = ['STATION', 'SOURCE', 'DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 
               'NAME', 'REPORT_TYPE', 'CALL_SIGN', 'QUALITY_CONTROL', 'WND', 
               'CIG', 'VIS', 'TMP', 'DEW', 'SLP', 'GA1', 'GF1', 'MA1', 'REM', 
               'AA1', 'AA2', 'AJ1', 'AL1', 'AN1', 'AO1', 'AU1', 'AT1']
airline_col = ["YEAR", "QUARTER", "MONTH", "DAY_OF_MONTH", "DAY_OF_WEEK", 
               "FL_DATE", "DEP_TIME", "DEP_TIME_BLK", "CRS_DEP_TIME", "CRS_ARR_TIME",
               "CRS_ELAPSED_TIME", "ARR_TIME", "ARR_TIME_BLK", "ACTUAL_ELAPSED_TIME",
               "ORIGIN", "ORIGIN_CITY_NAME", "ORIGIN_STATE_ABR", "ORIGIN_STATE_FIPS",
               "ORIGIN_STATE_NM", "ORIGIN_WAC",
               "DEST", "DEST_CITY_NAME", "DEST_STATE_ABR", "DEST_STATE_FIPS", 
               "DEST_STATE_NM","DEST_WAC",
               "OP_UNIQUE_CARRIER", "FLIGHTS", "DISTANCE", "DISTANCE_GROUP", "DIVERTED",
               "CANCELLED", "CANCELLATION_CODE", "CARRIER_DELAY", "DEP_DELAY", "DEP_DELAY_NEW",
               "DEP_DELAY_GROUP", "DEP_DEL15", "ARR_DELAY", "ARR_DELAY_NEW", "ARR_DELAY_GROUP",
               "ARR_DEL15", "WEATHER_DELAY", "NAS_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY"]
# station_col = ['station_id', 'wban', 'lat', 'lon', 'neighbor_id', 'neighbor_name', 'neighbor_state',
#                'neighbor_call', 'neighbor_lat', 'neighbor_lon', 'distance_to_neighbor']


# read FULL datasets and add more data to open_flights
df_airlines = spark.read.parquet("/mnt/mids-w261/datasets_final_project/parquet_airlines_data/*").select(airline_col) # 63,493,682 records
#df_stations = spark.read.parquet('dbfs:/mnt/mids-w261/datasets_final_project/stations_data/*').select(station_col) # 5,004,169
df_weather = spark.read.parquet('dbfs:/mnt/mids-w261/datasets_final_project/weather_data/*').select(weather_col) # 630,904,436 records
open_flights = spark.createDataFrame(pd.read_csv("https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat", 
                           names = ['id','name', 'city', 'country', 'iata', 'icao', 'lat', 'lng', 'altitude', 
                                    'timezone', 'dst', 'tz_db_time_zone', 'type', 'source'])) # 7,698 records, +4 more below

# Clean each dataset before the massive join

open_flights - filtering this dataset to only contain the necessary airports that are in df_airlines

In [None]:
# select subset of columns
open_flights = open_flights.select('iata', 'icao', 'lat', 'lng', 'tz_db_time_zone')
print(open_flights.count())

# add 4 more airports
open_flights2 = spark.createDataFrame(pd.DataFrame({
    'iata': ['XWA', 'EAR', 'TKI', 'IFP'], 
    'icao': ['KXWA', 'KEAR', 'KTKI', 'KIFP'], 
    'lat': [48.2578135, 40.7274925, 33.1775399, 35.16558], 
    'lng': [-103.7418471, -99.0122646, -96.5926444, -114.557093], 
    'tz_db_time_zone': ['America/Chicago', 'America/Chicago', 'America/Chicago', 'America/Phoenix']
}))

# now union them
open_flights = open_flights.union(open_flights2)

# only need to keep the flights that are in df_airlines
origin_airports = df_airlines.select("origin").distinct().toPandas()
dest_airports = df_airlines.select("dest").distinct().toPandas()
all_airports = set(list(origin_airports.origin) + list(dest_airports.dest))
# do the filter
open_flights = open_flights.filter(F.col('iata').isin(all_airports))

open_flights.cache()

Create a master mapping called 'airport_weather_mapping'

This takes the 372 distinct airports we need that are in open_flights. Then we take the 45,638 distinct weather stations. Finally we join these two based on nearest lat/lng. I'm only using Euclidean distance as the distance metric. Haversine distance is most accurate but seems complicated and Euclidean should return the same closest station.

In [None]:
open_flights.createOrReplaceTempView("open_flights")
# 45,638 distinct weather stations
df_weather.select('latitude', 'longitude', 'station').distinct().createOrReplaceTempView('df_weather')

open_flights_closest_weather = spark.sql('''

with 

--step1: get every single combination between the two
-- and compute distance between. just use euclidean for now
tbl1 as (
    select
        a.iata
        ,a.icao
        ,a.lat lat_of
        ,a.lng lng_of
        ,b.latitude lat_w
        ,b.longitude lng_w
        ,b.station
        ,a.tz_db_time_zone
        ,power(power(a.lat - b.latitude,2) + power(a.lng - b.longitude,2),.5) as distance_euclidean
    from open_flights a 
    cross join df_weather b
)

--step2: the memory intensive sort to sort the distances for each airport
,tbl2 as(
    select
        tbl1.*
        ,dense_rank() over (partition by tbl1.iata order by tbl1.distance_euclidean) as n
    from tbl1
)

--step 3: filter to only include the 1 closest weather station. We could alternatively filter to include the top 3 closest weather stations and use that to take some sort of average if we want. 

select *
from tbl2
where n = 1
''')

# see a view of the whole thing:
display(open_flights_closest_weather)

# select a subset of columns to use as our "master mapping"
airport_weather_mapping = open_flights_closest_weather.select(['iata', 'icao', 'station', 'tz_db_time_zone'])
airport_weather_mapping.cache()

iata,icao,lat_of,lng_of,lat_w,lng_w,station,tz_db_time_zone,distance_euclidean,n
ATY,KATY,44.91400146,-97.15470123,44.9047,-97.1494,72654614946,America/Chicago,0.0107060822733886,1
BGM,KBGM,42.20869827,-75.97979736,42.2068,-75.98,72515004725,America/New_York,0.0019090552539127,1
BTM,KBTM,45.95479965209961,-112.49700164794922,45.9647,-112.5006,72774024135,America/Denver,0.0105339938309352,1
BUR,KBUR,34.20069885253906,-118.35900115966795,34.20056,-118.3575,72288023152,America/Los_Angeles,0.001507567702059,1
CID,KCID,41.88470077514648,-91.71080017089844,41.8833,-91.7166,72545014990,America/Chicago,0.0059665893623007,1
CIU,KCIU,46.25080108642578,-84.47239685058594,46.25,-84.46667,72734404869,America/New_York,0.005782608156818,1
COD,KCOD,44.520198822,-109.024002075,44.51667,-109.01667,72670024045,America/Denver,0.008137070020174,1
DCA,KDCA,38.8521,-77.037697,38.8472,-77.03454,72405013743,America/New_York,0.0058289492191914,1
DLG,PADL,59.04470062,-158.5050049,59.05,-158.5167,70321025513,America/Anchorage,0.0128397349035834,1
DRT,KDRT,29.3742008209,-100.927001953,29.3784,-100.927,72261022010,America/Chicago,0.0041991795541599,1


df_weather - make a copy of this dataset so we have 1 copy for origin airport, and a 2nd copy for dest airport

In [None]:
station_l = list(airport_weather_mapping.select('station').toPandas()['station'])
df_weather_origin = df_weather.filter(F.col('station').isin(station_l))
df_weather_dest = df_weather.filter(F.col('station').isin(station_l))
old_col_nms = df_weather.columns

new_col_nms_origin = [var + '_origin' for var in old_col_nms]
new_col_nms_dest = [var + '_dest' for var in old_col_nms]

for i in range(len(old_col_nms)):
    df_weather_origin = df_weather_origin.withColumnRenamed(old_col_nms[i], new_col_nms_origin[i])
    df_weather_dest = df_weather_dest.withColumnRenamed(old_col_nms[i], new_col_nms_dest[i])

df_weather_origin.cache()
df_weather_dest.cache()

df_airlines - convert everything to UTC time to align with df_weather

In [None]:
# bring time-zones 
df_airlines = df_airlines.withColumn('datetime_dep', 
    F.unix_timestamp(F.concat(
        F.col('fl_date'), 
        F.lit(' '), 
        F.lpad(F.col('crs_dep_time'), 4, '0')
        ), 'yyyy-MM-dd HHmm').cast(types.TimestampType())
    )

# convert to UTC, and get 2 hr prior
df_airlines = df_airlines\
    .join(open_flights.select(['iata', 'tz_db_time_zone']).distinct(), on=[df_airlines.ORIGIN == open_flights.iata], how='left')\
    .withColumn("utc_dep", F.to_utc_timestamp(F.col("datetime_dep"), F.col("tz_db_time_zone")))\
    .withColumn("utc_dep_2hrs_prior", F.col('utc_dep') - F.expr('INTERVAL 2 HOURS'))

# bring in weather station for origin and dep airports
airport_weather_mapping_pd = airport_weather_mapping.toPandas()
airport_weather_mapping_pd['weather_station_dest'] = airport_weather_mapping_pd['station']
airport_weather_mapping_pd['weather_station_orig'] = airport_weather_mapping_pd['station']
df_join = spark.createDataFrame(airport_weather_mapping_pd[['iata', 'weather_station_orig', 'weather_station_dest']])

# bring in weather station names
df_airlines = df_airlines\
    .join(df_join.select('weather_station_orig', 'iata'), 'iata', how='left')\
    .join(df_join.select('weather_station_dest', 'iata'), 'iata', how='left')

airport_weather_mapping = open_flights_closest_weather.select(['iata', 'icao', 'station', 'tz_db_time_zone'])

df_airlines.cache()

# THE FINAL JOIN

In [None]:
df_airlines.createOrReplaceTempView("df_airlines")
df_weather_origin.createOrReplaceTempView("df_weather_origin")
df_weather_dest.createOrReplaceTempView("df_weather_dest")
airport_weather_mapping.createOrReplaceTempView("airport_weather_mapping")

df_full = spark.sql('''
with weather_orig as (
    select *
        ,lead(DATE_origin) over (partition by STATION_origin order by DATE_origin) as date_origin_next
    from df_weather_origin
)

,weather_dest as (
    select *
        ,lead(DATE_dest) over (partition by STATION_dest order by DATE_dest) as date_dest_next
    from df_weather_dest
)

select 
    air.*
    ,weather_orig.*
    ,weather_dest.*
  
from df_airlines air

left join weather_orig 
    on air.weather_station_orig = weather_orig.STATION_origin 
    and air.utc_dep_2hrs_prior >= weather_orig.DATE_origin 
    and (air.utc_dep_2hrs_prior < weather_orig.date_origin_next or weather_orig.date_origin_next is null)
  
left join weather_dest 
    on air.weather_station_dest = weather_dest.STATION_dest 
    and air.utc_dep_2hrs_prior >= weather_dest.DATE_dest and 
    (air.utc_dep_2hrs_prior < weather_dest.date_dest_next or weather_dest.date_dest_next is null)
''')

In [None]:
# write it to storage
blob_container = "main-storage" # The name of your container created in https://portal.azure.com
storage_account = "team05w261" # The name of your Storage account created in https://portal.azure.com
secret_scope = "team05" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "team05-key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

# Configure blob storage account access key globally
spark.conf.set(
  f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

df_full.write.parquet(f"{blob_url}/full_data_attempt1")

In [None]:
# check it out
display(df_full)