In [0]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sn
from pyspark.sql.functions import isnull, when, count
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark.sql.window import Window

from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
import pyspark
import time
from pyspark.ml.feature import Imputer

In [0]:
blob_container  = "team3-4"       # The name of your container created in https://portal.azure.com
storage_account = "daphnelin"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "team_3-4"           # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "key_3_4"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket

# the 261 course blob storage is mounted here.
mids261_mount_path      = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

# # see what's in the blob storage root folder 
display(dbutils.fs.ls(f"{team_blob_url}"))

In [0]:
# Load station table
df_stations = spark.read.parquet("dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/*")

# Zero distance to neighbor means the station is the same as its neighbor.
df_stations = df_stations.filter(col("distance_to_neighbor") == 0)
print(f"Stations count = {df_stations.count()}")
df_stations.createOrReplaceTempView("stations")


Stations count = 2261


In [0]:
# Load external data to map weather station to IATA codes
adf = pd.read_csv('https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv', header=None)

col_names = {
0: 'ID', 1: 'ident', 2: 'Type', 3: 'Name', 4: 'Latitude', 5: 'Longitude', 6: 'Elevation', 
7: 'Continent', 8: 'Country', 9: 'Region', 10: 'County', 11: 'scheduled_service', 12: 'ICAO', 13: 'IATA',
14: 'local_code', 15: 'home_link', 16: 'wiki_link', 17: 'keywords'
}
adf.rename(columns=col_names, inplace=True)
df_airport = spark.createDataFrame(adf)
df_airport.createOrReplaceTempView("airports")

  adf = pd.read_csv('https://raw.githubusercontent.com/davidmegginson/ourairports-data/main/airports.csv', header=None)
  Expected bytes, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
# Join station data with external IATA data
query_station_airport = """
SELECT * 
FROM 
(SELECT * FROM stations) AS s 
LEFT JOIN 
(SELECT ICAO, IATA, Country, Elevation FROM airports) AS a
ON s.neighbor_call = a.ICAO
"""
stations_with_iata = spark.sql(query_station_airport)
print(f"Stations joined count = {stations_with_iata.count()}")

Stations joined count = 2261


In [0]:
# Write final stations dataset to parquet
stations_with_iata.write.mode("overwrite").parquet(f"{team_blob_url}/stations_with_iata")

In [0]:
# Load flight data and get unique airports
df_airlines = spark.read.parquet("dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/*")
origin_airports = df_airlines.select("ORIGIN").distinct().collect()
dest_airports = df_airlines.select("DEST").distinct().collect()
all_airports = set([o["ORIGIN"] for o in origin_airports] + [d["DEST"] for d in dest_airports])
unique_airports = spark.createDataFrame([[a] for a in sorted(all_airports)], ["AIRPORT"])
unique_airports.write.mode("overwrite").parquet(f"{team_blob_url}/unique_airports")

In [0]:
# Filter stations to only the airports from the full flights dataset
airports = {r["AIRPORT"] for r in unique_airports.select("AIRPORT").distinct().collect()}
print(f"Airports count in flights dataset = {len(airports)}")
stations_with_iata = stations_with_iata.filter(stations_with_iata.IATA.isin(airports))
print(f"Airports found in joined stations = {stations_with_iata.count()}")
airports_in_joined = {r["IATA"] for r in stations_with_iata.select("IATA").distinct().collect()}
airports_not_found = airports - airports_in_joined
print(f"Airports not found: {', '.join(sorted(airports_not_found))}")
display(stations_with_iata)


In [0]:
# Look for the missing airports in the external data
missing_icao = set(adf.loc[(adf["IATA"].isin(airports_not_found)), "ICAO"])
print('missing airports')
display(adf.loc[(adf["IATA"].isin(airports_not_found))])


missing airports


  Expected bytes, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


ID,ident,Type,Name,Latitude,Longitude,Elevation,Continent,Country,Region,County,scheduled_service,ICAO,IATA,local_code,home_link,wiki_link,keywords
12114,57A,seaplane_base,Tokeen Seaplane Base,55.9370994568,-133.32699585,,,US,US-AK,Tokeen,no,57A,TKI,57A,,https://en.wikipedia.org/wiki/Tokeen_Seaplane_Base,
20727,KOGS,medium_airport,Ogdensburg International Airport,44.6819,-75.4655,297.0,,US,US-NY,Ogdensburg,yes,KOGS,OGS,OGS,https://ogsair.com/,https://en.wikipedia.org/wiki/Ogdensburg_International_Airport,
4978,NSTU,medium_airport,Pago Pago International Airport,-14.331,-170.710007,32.0,OC,AS,AS-WT,Pago Pago,yes,NSTU,PPG,PPG,,https://en.wikipedia.org/wiki/Pago_Pago_International_Airport,
5431,PGSN,medium_airport,Saipan International Airport,15.119,145.729004,215.0,OC,MP,MP-U-A,"I Fadang, Saipan",yes,PGSN,SPN,GSN,,https://en.wikipedia.org/wiki/Saipan_International_Airport,Francisco C. Ada
5433,PGUM,large_airport,Antonio B. Won Pat International Airport,13.4834,144.796005,298.0,OC,GU,GU-U-A,Hagåtña,yes,PGUM,GUM,GUM,http://www.guamairport.com/,https://en.wikipedia.org/wiki/Antonio_B._Won_Pat_International_Airport,Agana
6383,TJPS,medium_airport,Mercedita Airport,18.00830078125,-66.56300354003906,29.0,,PR,PR-U-A,Ponce,yes,TJPS,PSE,PSE,,https://en.wikipedia.org/wiki/Mercedita_Airport,
6384,TJSJ,large_airport,Luis Munoz Marin International Airport,18.4393997192,-66.0018005371,9.0,,PR,PR-U-A,San Juan,yes,TJSJ,SJU,SJU,,https://en.wikipedia.org/wiki/Luis_Mu%C3%B1oz_Mar%C3%ADn_International_Airport,Isla Verde
324440,US-0571,medium_airport,Williston Basin International Airport,48.258387,-103.748797,2344.0,,US,US-ND,Williston,yes,KXWA,XWA,XWA,http://www.xwaproject.com/,https://en.wikipedia.org/wiki/Williston_Basin_International_Airport,


In [0]:
# Count of flights from airports not found
flights_from_bad_airports = df_airlines.filter(df_airlines["ORIGIN"].isin(airports_not_found))
print('Check counts of flights from airports not found')
display(flights_from_bad_airports.groupby("ORIGIN").count())


Check counts of flights from airports not found


ORIGIN,count
PSE,8508
PPG,1228
OGS,2618
SPN,1962
SJU,308194
GUM,6361
XWA,2173
ISN,15188
TKI,2


In [0]:
# Get the unique relevant station IDs from the final station table
station_ids = {r["station_id"] for r in stations_with_iata.select(col("station_id")).distinct().collect()}


In [0]:
# Load weather table
df_weather = spark.read.parquet("dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data/*")


In [0]:
original_weather_count = df_weather.count()
print(f"Original weather count = {original_weather_count}")


Original weather count = 898983399


In [0]:
# Filter the weather table to only the relevant stations
df_weather_filtered = df_weather.filter(df_weather.STATION.isin(station_ids))
filtered_weather_count = df_weather_filtered.count()
print(f"Filtered weather count = {filtered_weather_count}")
print(f"Weather data size reduced by {(1-(filtered_weather_count/original_weather_count))*100:.0f}%")


In [0]:
# Round weather data to nearest hour to merge with flights, and then shift by 2 hours.
# First shift by -1 minutes (so that rows exactly on the hour aren't shifted 3 hours),
# then shift by 3 hours so that each row is at least 2 hours from its original timestamp.

weather_original_columns = df_weather_filtered.columns

# Shift by 2 hours
df_weather_filtered = df_weather_filtered.withColumn(
"shifted_timestamp", df_weather_filtered["DATE"] + expr("INTERVAL -1 MINUTES")
)
df_weather_filtered = df_weather_filtered.withColumn(
"shifted_timestamp", df_weather_filtered["shifted_timestamp"] + expr("INTERVAL 3 HOURS")
)

# Truncate hour (i.e. set minutes and everything after to 0)
df_weather_filtered = df_weather_filtered.withColumn(
"final_timestamp", date_trunc("hour", df_weather_filtered.shifted_timestamp)
)

# Rearrange columns
df_weather_filtered = df_weather_filtered.select(
weather_original_columns[:2] + ["final_timestamp"] + weather_original_columns[2:]
)

In [0]:
# There will be lots of duplicates by station ID and final_timestamp. 
# Drop duplicates ordered by station ID and original datestamp, to keep
# the observation closest to the final_timestamp.

window = Window.partitionBy("STATION", "final_timestamp").orderBy("DATE", "tiebreak")
df_weather_deduped = df_weather_filtered\
.withColumn("tiebreak", monotonically_increasing_id())\
.withColumn("rank", rank().over(window))\
.filter(col("rank") == 1).drop("rank", "tiebreak")

print('Weather data after dropping duplicates')
display(df_weather_deduped)

In [0]:
# Merge relevant weather data with station info

weather_keep_columns = [
"STATION", "DATE", "final_timestamp", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", 
"HourlyWindDirection", "HourlyVisibility", "HourlyDryBulbTemperature", "HourlyDewPointTemperature", "HourlySeaLevelPressure", 
]

station_keep_cols = [
"station_id", "neighbor_state", "ICAO", "IATA", "Country", "Elevation"
]

stations_with_iata.select(station_keep_cols).createOrReplaceTempView("stations")
df_weather_deduped.select(weather_keep_columns).createOrReplaceTempView("weather")

query_weather_stations = f"""
SELECT * 
FROM 
(SELECT {', '.join(weather_keep_columns)} FROM weather) AS w
LEFT JOIN 
(SELECT {', '.join(station_keep_cols)} FROM stations) AS s
ON w.STATION = s.station_id
"""

joined_weather_stations = spark.sql(query_weather_stations)
print('Joined weather and station data')
display(joined_weather_stations) 


In [0]:
df_airlines = spark.read.parquet("dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/*")


In [0]:
# Drop irrelevant flight table columns

flights_keep_columns = [
'QUARTER', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'FL_DATE',
'OP_UNIQUE_CARRIER', 'OP_CARRIER_AIRLINE_ID', 'OP_CARRIER', 'TAIL_NUM', 'OP_CARRIER_FL_NUM',
'ORIGIN_AIRPORT_ID', 'ORIGIN_AIRPORT_SEQ_ID', 'ORIGIN_CITY_MARKET_ID', 'ORIGIN', 'ORIGIN_CITY_NAME',
'ORIGIN_STATE_ABR', 'ORIGIN_STATE_FIPS', 'ORIGIN_STATE_NM', 'ORIGIN_WAC', 
'DEST_AIRPORT_ID', 'DEST_AIRPORT_SEQ_ID', 'DEST_CITY_MARKET_ID', 'DEST', 'DEST_CITY_NAME', 
'DEST_STATE_ABR', 'DEST_STATE_FIPS', 'DEST_STATE_NM', 'DEST_WAC',
'CRS_DEP_TIME', 'DEP_TIME', 'DEP_DELAY', 'DEP_DELAY_NEW', 'DEP_DEL15', 'DEP_DELAY_GROUP', 'DEP_TIME_BLK', 
'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 
'CRS_ARR_TIME', 'ARR_TIME', 'ARR_DELAY', 'ARR_DELAY_NEW', 'ARR_DEL15', 'ARR_DELAY_GROUP', 'ARR_TIME_BLK',
'CANCELLED', 'CANCELLATION_CODE', 'DIVERTED', 'CRS_ELAPSED_TIME', 'ACTUAL_ELAPSED_TIME', 'AIR_TIME', 
'FLIGHTS', 'DISTANCE', 'DISTANCE_GROUP', 
'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY'
]

flights = df_airlines.select(*flights_keep_columns) 

In [0]:
# Additional clean up to drop flight records:

# Assumption 1: Remove cancelled flights
"""When DEP_DEL15.isNull(), these are cancelled flights. Our Phase I results indicated that flights are cancelled due to various reasons. We can safely take out cancelled flights (null value) from the delayed flights ("DEP_DEL15") since they are not relevant and minimal.
"""
flights = flights.where(col("CANCELLED") != 1)


In [0]:
# Assumption 2: Remove diverted flights
"""A flight diversion is when an aircraft is unable to arrive at its final destination. Such as Aircraft emergency; Passenger emergency; Mechanical failure; and Poor weather conditions. We decided to remove this since it's not relevant to our analysis."""
flights = flights.where(col("DIVERTED") != 1)


In [0]:
# Drop any duplicate rows in full dataset:
flights = flights.dropDuplicates()

In [0]:
# There are an additional rows where for some reason the departure delay columns are null.
# On inspection, in all these rows the scheduled CRS_DEP_TIME is equal to the DEP_TIME, meaning the delay is 0 minutes.
# So we fill these columns with 0
flights = flights.fillna(value=0, subset=["DEP_DELAY", "DEP_DELAY_NEW", "DEP_DEL15", "DEP_DELAY_GROUP"])


In [0]:
# Convert flight Departure Times to UTC and round to nearest hour:

# Convert departure time integers to zero-padded strings, e.g. 607 -> 0000607:
# Modification: Use scheduled departure time instead
flights = flights.withColumn("PADDED_DEP_TIME", format_string("0000%d", "CRS_DEP_TIME"))
# Shorten the strings to the final 4 chars, e.g. 0000607 -> 0607:
flights = flights.withColumn("FORMATTED_DEP_TIME", substring("PADDED_DEP_TIME", -4,4))
# Concatenate string columns for departure date and time:
flights = flights.withColumn("DEPT_DT_STR", concat_ws(" ", flights.FL_DATE, flights.FORMATTED_DEP_TIME))
# Convert string datetime to timestamp:
flights = flights.withColumn("DEPT_DT", to_timestamp(flights.DEPT_DT_STR, "yyyy-MM-dd HHmm"))

# Remove minutes and round datetimes *down* to nearest hour. It is necessary to round
# down so that we don't join with weather data from less than 2 hours before:
# flights = flights.withColumn("DEPT_UTC_HOUR", date_trunc("HOUR", flights.DEPT_UTC))

In [0]:
# Calculate arrival time in UTC using departure time and elapsed time:
# Modification: Use scheduled elapsed time
flights = flights.withColumn("ARR_UTC", col("DEPT_DT") + (col("CRS_ELAPSED_TIME") * expr("Interval 1 Minutes")))  


In [0]:
# Join flights and weather data on airport IATA

flights.createOrReplaceTempView("flights")
joined_weather_stations.createOrReplaceTempView("joined_weather_stations")


final_df = spark.sql('''
  select *
  from flights as f
  inner join joined_weather_stations as w on f.ORIGIN = w.IATA
  ''')

final_n = final_df.count()
flights_n = flights.count()
print(f"Final dataset has {final_n:,} rows ({final_n-flights_n:,} dropped from original flights dataset)")



Final dataset has 2,530,970,592,598 rows (-2,530,929,136,001 dropped from original flights dataset)


In [0]:
# Save final dataset to blob storage
final_df.write.mode("overwrite").parquet(f"{team_blob_url}/team3-4_combined_dataset")