# Joins

In [None]:
from pyspark.sql.functions import col, max

blob_container = "" # The name of your container created in https://portal.azure.com
storage_account = "" # The name of your Storage account created in https://portal.azure.com
secret_scope = "" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [None]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [None]:
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, datediff, when
print("Welcome to the W261 final project HEY") #hey everyone!Hi HELLO good morning

Welcome to the W261 final project HEY


In [None]:
# Inspect the Mount's Final Project folder 
# Please IGNORE dbutils.fs.cp("/mnt/mids-w261/datasets_final_project/stations_data/", "/mnt/mids-w261/datasets_final_project_2022/stations_data/", recurse=True)
data_BASE_DIR = "dbfs:/mnt/mids-w261/datasets_final_project_2022/"
display(dbutils.fs.ls(f"{data_BASE_DIR}"))

path,name,size,modificationTime
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data/,parquet_airlines_data/,0,1656618287000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_1y/,parquet_airlines_data_1y/,0,1656630272000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_3m/,parquet_airlines_data_3m/,0,1656630114000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_airlines_data_6m/,parquet_airlines_data_6m/,0,1656630205000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data/,parquet_weather_data/,0,1656622074000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_1y/,parquet_weather_data_1y/,0,1656631614000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_3m/,parquet_weather_data_3m/,0,1656630651000
dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data_6m/,parquet_weather_data_6m/,0,1656631047000
dbfs:/mnt/mids-w261/datasets_final_project_2022/stations_data/,stations_data/,0,1656713663000


In [None]:
# Set partitions
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.sql.files.minPartitionNum", 1000)

In [None]:
# Create and save the Aiport Dataset with IATA ICAO and Timezones

!pip install -U airportsdata
import airportsdata
import pandas as pd
#get timezone
gadb = airportsdata.load()
df_gadb = pd.DataFrame.from_dict(gadb).transpose()
#get rid of blank iata codes
df_gadb = df_gadb[df_gadb['iata'] != ""]
df_gadb = spark.createDataFrame(df_gadb)

df_airport_timezone = df_gadb.distinct()

# There are 12 airports missing in this dataset present in the whole flights datasets
values = [
  ("KLCK", "LCK", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/New_York"),
  ("KWYS", "WYS", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Denver"),
  ("KSCK", "SCK", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Los_Angeles"),
  ("KPGD", "PGD", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/New_York"),
  ("KVEL", "VEL", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Denver"),
  ("KCNY", "CNY", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Denver"),
  ("KHOB", "HOB", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Denver"),
  ("KESC", "ESC", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Detroit"),
  ("KRIW", "RIW", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Denver"),
  ("KCRQ", "CLD", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Los_Angeles"),
  ("KEFD", "EFD", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Chicago"),
  ("KTKI", "TKI", "x", "x", "x", "x", 0.0, 0.0, 0.0, "America/Chicago")
]

df_missing_airports = spark.createDataFrame(values, df_airport_timezone.columns)

df_airport_timezone = df_airport_timezone.union(df_missing_airports)

df_airport_timezone = df_airport_timezone.withColumnRenamed("name", "airport_name") \
  .withColumnRenamed("city", "airport_city") \
  .withColumnRenamed("subd", "airport_subd") \
  .withColumnRenamed("country", "airport_country") \
  .withColumnRenamed("lat", "airport_lat") \
  .withColumnRenamed("lon", "airport_lon") \
  .withColumnRenamed("tz", "airport_tz")

df_airport_timezone.write.mode("overwrite").parquet(f"{blob_url}/airport_codes_timezone")

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-a9095a0e-7b7e-49d6-9a3e-0ad47f2d4298/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
# Load the Flights data and remove duplicates 
df_airlines = spark.read.parquet(f"{data_BASE_DIR}parquet_airlines_data/")
df_airlines = df_airlines.distinct()
df_airlines.write.mode("overwrite").parquet(f"{blob_url}/df_airlines_distinct")

In [None]:
# Load all 4 datasets for initial joins
df_airlines = spark.read.parquet(f"{blob_url}/df_airlines_distinct")
df_airport_timezone = spark.read.parquet(f"{blob_url}/airport_codes_timezone")
df_weather = spark.read.parquet(f"{data_BASE_DIR}parquet_weather_data/")
df_stations = spark.read.parquet(f"{data_BASE_DIR}stations_data/*")

In [None]:
df_airlines.count() # 3m 1403471 1y 7422037 all 42430592 3.66

Out[171]: 42430592

In [None]:
# Drop all columns but neighbor id and icao
df_stations_icao = df_stations.select('neighbor_id', 'neighbor_call').withColumnRenamed("neighbor_call", "icao_station").distinct()

# Rename weather columns
df_weather = df_weather.withColumnRenamed("NAME", "STATION_NAME") \
  .withColumnRenamed("STATION", "STATION_ID") \
  .withColumnRenamed("ELEVATION", "STATION_ELEVATION") \
  .withColumnRenamed("LATITUDE", "STATION_LATITUDE") \
  .withColumnRenamed("LONGITUDE", "STATION_LONGITUDE") \
  .withColumnRenamed("DATE", "WEATHER_DATE") \
  .withColumnRenamed("REPORT_TYPE", "WEATHER_REPORT_TYPE") \
  .withColumnRenamed("SOURCE", "WEATHER_SOURCE")

columns_to_keep = ["HourlyDewPointTemperature", "HourlyDryBulbTemperature", "HourlyPrecipitation", "HourlyPresentWeatherType", "HourlyStationPressure", "HourlyVisibility", "HourlyWetBulbTemperature", "HourlyWindDirection", "HourlyWindSpeed", "YEAR"]
# Drop weather reports that are not FM-15 or 16
df_weather = df_weather.where("WEATHER_REPORT_TYPE in ('FM-15', 'FM-16')").select("STATION_ID", "STATION_NAME", "STATION_ELEVATION", "STATION_LATITUDE", "STATION_LONGITUDE", "WEATHER_DATE", "WEATHER_REPORT_TYPE", "WEATHER_SOURCE", *columns_to_keep)

# Add valid ICAO to weather data
df_weather_icao = df_stations_icao.join(df_weather,\
                                           df_weather.STATION_ID == df_stations_icao.neighbor_id,\
                                           'left').where('neighbor_id IS NOT NULL').drop("neighbor_id")

#Pull in timezone for weather stations
df_weather_icao_tz = df_weather_icao.join(df_airport_timezone,\
                                     df_weather_icao.icao_station == df_airport_timezone.icao,\
                                     'left').drop("icao_station")


# Merge airlines and timezones
flights_icao_tz = df_airlines.join(df_airport_timezone,\
                                     df_airlines.ORIGIN == df_airport_timezone.iata,\
                                     'left')

flights_icao_tz = flights_icao_tz.withColumn("flight_id", monotonically_increasing_id())


# Create unique origins/airports
origin_airports = df_airlines.select("ORIGIN").distinct().withColumnRenamed("ORIGIN", "AIRPORT_IATA")
dest_airports = df_airlines.select("DEST").distinct().withColumnRenamed("ORIGIN", "AIRPORT_IATA")
all_airports = origin_airports.union(dest_airports).distinct()

all_airports_icao = all_airports.join(df_airport_timezone, df_airport_timezone.iata == all_airports.AIRPORT_IATA).where("iata is not NULL").select("AIRPORT_IATA", "icao").withColumnRenamed("icao", "icao_flight")

# Drop entries not in flights data
df_weather_icao_needed_tz = df_weather_icao_tz.join(all_airports_icao,\
                                           df_weather_icao_tz.icao == all_airports_icao.icao_flight,\
                                           'left').where('AIRPORT_IATA IS NOT NULL').drop("AIRPORT_IATA", "icao_flight")

# Save the new datasets
flights_icao_tz.write.mode("overwrite").parquet(f"{blob_url}/flights_with_icao_tz")
df_weather_icao_needed_tz.write.mode("overwrite").parquet(f"{blob_url}/weather_with_icao_tz")

In [None]:
# Load the new datasets
flights_icao_tz = spark.read.parquet(f"{blob_url}/flights_with_icao_tz") # 42430592
df_weather_icao_needed_tz = spark.read.parquet(f"{blob_url}/weather_with_icao_tz")

In [None]:
from datetime import  datetime, timedelta
from pytz import timezone

def get_hour_minutes(depature_time):
    depature_time = str(depature_time)
    hour = ''
    if len(depature_time) <= 2:
      hour = '00'
      minutes = depature_time
    elif len(depature_time) == 3:
      hour = depature_time[0]
      minutes = depature_time[1:]
    elif len(depature_time) == 4:
      hour = depature_time[:2]
      minutes = depature_time[2:]
    return hour, minutes
  
def get_timestamp(year, month, day, hour_minutes, tz):
  hour, minutes = get_hour_minutes(hour_minutes)
  utc = timezone('UTC')
  tz = timezone(tz)
  timestamp = tz.localize(datetime(int(year), int(month), int(day), hour=int(hour), minute=int(minutes)))
  return timestamp.astimezone(utc)

def get_hour_timestamp(year, month, day, hour_minutes, tz):
  hour, minutes = get_hour_minutes(hour_minutes)
  utc = timezone('UTC')
  tz = timezone(tz)
  timestamp = tz.localize(datetime(int(year), int(month), int(day), hour=int(hour)))
  return timestamp.astimezone(utc)

def get_weather_timestamp(depature_timestamp, hours = 2):
  return (depature_timestamp - timedelta(hours=hours))

def go_back_an_hour(depature_timestamp):
  return (depature_timestamp - timedelta(hours=1))
 
def get_string_timestamp(timestamp):
  return timestamp.strftime("%Y%m%d%H%M")

def get_hourly_string_timestamp(timestamp):
  return timestamp.strftime("%Y%m%d%H")
 
get_timestamp = udf(get_timestamp)
get_hour_timestamp = udf(get_hour_timestamp)
get_weather_timestamp = udf(get_weather_timestamp)
get_string_timestamp = udf(get_string_timestamp)
get_hourly_string_timestamp = udf(get_hourly_string_timestamp)
go_back_an_hour = udf(go_back_an_hour)

# Create the timestamp columns for the flight data
flights_icao_tz = flights_icao_tz.withColumn("FLIGHT_TIMESTAMP", get_timestamp("YEAR", "MONTH", "DAY_OF_MONTH", "CRS_DEP_TIME", "airport_tz"))
flights_icao_tz = flights_icao_tz.withColumn("HOUR_WEATHER_TIMESTAMP", get_hour_timestamp("YEAR", "MONTH", "DAY_OF_MONTH", "CRS_DEP_TIME", "airport_tz"))
flights_icao_tz = flights_icao_tz.withColumn("WEATHER_TIMESTAMP", get_weather_timestamp("FLIGHT_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("HOUR_WEATHER_TIMESTAMP", get_weather_timestamp("HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("TWO_HOUR_WEATHER_TIMESTAMP", go_back_an_hour("HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("THREE_HOUR_WEATHER_TIMESTAMP", go_back_an_hour("TWO_HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("FLIGHT_TIMESTAMP", get_string_timestamp("FLIGHT_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("WEATHER_TIMESTAMP", get_string_timestamp("WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("HOUR_WEATHER_TIMESTAMP", get_hourly_string_timestamp("HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("TWO_HOUR_WEATHER_TIMESTAMP", get_hourly_string_timestamp("TWO_HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("THREE_HOUR_WEATHER_TIMESTAMP", get_hourly_string_timestamp("THREE_HOUR_WEATHER_TIMESTAMP"))
flights_icao_tz = flights_icao_tz.withColumn("FLIGHT_TIMESTAMP", to_timestamp(col("FLIGHT_TIMESTAMP"), "yyyyMMddHHmm"))
flights_icao_tz = flights_icao_tz.withColumn("WEATHER_TIMESTAMP", to_timestamp(col("WEATHER_TIMESTAMP"), "yyyyMMddHHmm"))

# Update the file with timestamps
flights_icao_tz.write.mode("overwrite").parquet(f"{blob_url}/flights_with_icao_tz")

In [None]:
def get_utc_difference(tz):
  utc_diff = {
    "America/Chicago": 6,
    "Pacific/Honolulu": 10,
    "America/Phoenix": 7,
    "America/Denver": 7,
    "America/Los_Angeles": 8,
    "America/New_York": 5,
    "America/Anchorage": 9,
    "America/Boise": 7,
    "America/Detroit": 5,
    "America/St_Thomas": 4,
    "America/Puerto_Rico": 4,
    "America/Adak": 10,
    "America/Juneau": 9,
    "America/Kentucky/Louisville": 5,
    "America/Indiana/Indianapolis": 5,
  }
  
  return utc_diff[tz] if tz in utc_diff else 0

def get_timestamp_weather_df(date, tz):
  if date is None or tz is None:
    return 'empty'
  diff = get_utc_difference(tz)
  utc = timezone('UTC')
  timestamp = utc.localize(datetime.fromisoformat(date)) + timedelta(hours=diff)
  return timestamp.strftime("%Y%m%d%H%M")

def get_hour_timestamp_weather_df(date, tz):
  if date is None or tz is None:
    return 'empty'
  diff = get_utc_difference(tz)
  utc = timezone('UTC')
  timestamp = utc.localize(datetime.fromisoformat(date))  + timedelta(hours=diff)
  return timestamp.strftime("%Y%m%d%H")

get_timestamp_weather_df = udf(get_timestamp_weather_df)
get_hour_timestamp_weather_df = udf(get_hour_timestamp_weather_df)
# Create the timestamp for the weather data
df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn("STATION_TIMESTAMP", get_timestamp_weather_df('WEATHER_DATE', 'airport_tz'))
df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn("HOUR_TIMESTAMP", get_hour_timestamp_weather_df('WEATHER_DATE', 'airport_tz'))
df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn("STATION_TIMESTAMP", to_timestamp(col("STATION_TIMESTAMP"), "yyyyMMddHHmm"))


In [None]:
#clean values oh hourly precipitation
def clean_hourly_precipitation(line, value= 0.0):
  if (line is None):
    value = 0.0
  elif ('T' in line) | (line == "") | ('*' in line):
    value = 0.0
  elif 's' in line:
    temp = line.replace('s', '')
    if len(line)>=8:
      value = (float(temp[:4])+float(temp[4:])) /2.0
    else:
      value = float(temp)
  elif len(line)>=8:
      value = (float(line[:4])+float(line[4:])) /2.0     
  else:
    value = float(line)
  return value  

clean_hourly_precipitation = udf(clean_hourly_precipitation)
# Create the timestamp for the weather data
df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn("HourlyPrecipitation", clean_hourly_precipitation('HourlyPrecipitation'))

In [None]:
# converte the hourly variables to numeric
from pyspark.sql.functions import col
columns_to_cast = ["HourlyDewPointTemperature","HourlyDryBulbTemperature","HourlyWetBulbTemperature", "HourlyStationPressure", "HourlyWindDirection", "HourlyWindSpeed", "HourlyVisibility"]
df_weather_icao_needed_tz = (
   df_weather_icao_needed_tz
   .select(
     *(c for c in df_weather_icao_needed_tz.columns if c not in columns_to_cast),
     *(col(c).cast("float").alias(c) for c in columns_to_cast)
   )
)

In [None]:
# Clean the Hourly weather event
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, datediff, when, split

df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn('AU_code', split(col('HourlyPresentWeatherType'), '\\|').getItem(0))\
                               .withColumn('AW_code', split(col('HourlyPresentWeatherType'), '\\|').getItem(1))\
                               .withColumn('MW_code', split(col('HourlyPresentWeatherType'), '\\|').getItem(2))\

df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn('AU_TS', when( (col('AU_code').contains("+TS")) | (col('AU_code').contains("FC")),1).otherwise(0))\
                                   .withColumn('AU_ICE', when(col('AU_code').contains("IC" ), 1).otherwise(0))\
                                   .withColumn('AU_SNOW',when(col('AU_code').contains("+SN" ), 1).otherwise(0))\
                                   .withColumn('AU_FOG', when(col('AU_code').contains("FG" ), 1).otherwise(0))

df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn('AW_TS', when((col('AW_code').contains("TS" ))| (col('AW_code').contains("FC")) , 1).otherwise(0))\
                                   .withColumn('AW_ICE', when(col('AW_code').contains("FZRA" ), 1).otherwise(0))\
                                   .withColumn('AW_SNOW',when(col('AW_code').contains("SN"), 1).otherwise(0))\
                                   .withColumn('AW_FOG', when(col('AW_code').contains("FG"), 1).otherwise(0))

df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn('MW_TS', when((col('MW_code').contains("TS" ))| (col('MW_code').contains("FC")), 1).otherwise(0))\
                                  .withColumn('MW_ICE', when(col('MW_code').contains("FZRA"), 1).otherwise(0))\
                                  .withColumn('MW_SNOW',when(col('MW_code').contains("SN" ), 1).otherwise(0))\
                                  .withColumn('MW_FOG', when(col('MW_code').contains("FG"), 1).otherwise(0))

df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn('TS',when( (col('AU_TS')== 1) | (col('AW_TS')== 1)| (col('MW_TS')== 1), 1).otherwise(0))\
                                  .withColumn('ICE', when((col('AU_ICE')== 1 )| (col('AW_ICE')== 1)| (col('MW_ICE')== 1), 1).otherwise(0))\
                                  .withColumn('SNOW',when((col('AU_SNOW')== 1 )| (col('AW_SNOW')== 1)| (col('MW_SNOW')== 1), 1).otherwise(0))\
                                  .withColumn('FOG', when((col('AU_FOG')== 1 )| (col('AW_FOG')== 1)| (col('MW_FOG')== 1), 1).otherwise(0))\


# Update weather dataset
df_weather_icao_needed_tz.write.mode("overwrite").parquet(f"{blob_url}/weather_with_icao_tz")

In [None]:
# Load the new datasets
flights_icao_tz = spark.read.parquet(f"{blob_url}/flights_with_icao_tz") # 42430592
df_weather_icao_needed_tz = spark.read.parquet(f"{blob_url}/weather_with_icao_tz") # 31717569

42430592
31717569


In [None]:
def create_composite_key(code, timestamp):
  return f'{code}_{timestamp}'

create_composite_key = udf(create_composite_key)

# Create composite key for both datasets
df_weather_icao_needed_tz = df_weather_icao_needed_tz.withColumn("CODE_STATION_TIMESTAMP", create_composite_key('icao', 'HOUR_TIMESTAMP'))
flights_icao_tz = flights_icao_tz.withColumn("CODE_TIMESTAMP", create_composite_key('icao', 'HOUR_WEATHER_TIMESTAMP')) \
  .withColumn("TWO_CODE_TIMESTAMP", create_composite_key('icao', 'TWO_HOUR_WEATHER_TIMESTAMP')) \
  .withColumn("THREE_CODE_TIMESTAMP", create_composite_key('icao', 'THREE_HOUR_WEATHER_TIMESTAMP'))

# Update the datasets
flights_icao_tz.write.mode("overwrite").parquet(f"{blob_url}/flights_with_icao_tz")
df_weather_icao_needed_tz.write.mode("overwrite").parquet(f"{blob_url}/weather_with_icao_tz")

In [None]:
# Load the new datasets
flights_icao_tz = spark.read.parquet(f"{blob_url}/flights_with_icao_tz") # 42430592
df_weather_icao_needed_tz = spark.read.parquet(f"{blob_url}/weather_with_icao_tz") # 1057832
print(flights_icao_tz.count())

42430592


In [None]:
# Drop unnecessary columns
columns_to_drop = ['airport_name', 'airport_city', 'airport_country', 'airport_tz', 'year', 'airport_subd', 'country', 'elevation', 'iata', 'airport_lon', 'airport_lat', 'icao']
df_weather_icao_needed_tz = df_weather_icao_needed_tz.drop(*columns_to_drop)

# Join the datasets
joined_df_one = flights_icao_tz.join(df_weather_icao_needed_tz, flights_icao_tz.CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')
joined_df_two = flights_icao_tz.join(df_weather_icao_needed_tz, flights_icao_tz.TWO_CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')
joined_df_three = flights_icao_tz.join(df_weather_icao_needed_tz, flights_icao_tz.THREE_CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')

combined_df = joined_df_one.union(joined_df_two).union(joined_df_three) # 3m 6204054
combined_df.write.mode("overwrite").parquet(f"{blob_url}/combined")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1013197717890732>[0m in [0;36m<cell line: 11>[0;34m()[0m
[1;32m      9[0m [0;34m[0m[0m
[1;32m     10[0m [0mcombined_df[0m [0;34m=[0m [0mjoined_df_one[0m[0;34m.[0m[0munion[0m[0;34m([0m[0mjoined_df_two[0m[0;34m)[0m[0;34m.[0m[0munion[0m[0;34m([0m[0mjoined_df_three[0m[0;34m)[0m [0;31m# 3m 6204054[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 11[0;31m [0mcombined_df[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0;34mf"{blob_url}/combined"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0

In [None]:
combined_df = spark.read.parquet(f"{blob_url}/combined") # 184162262
combined_df.count()

Out[42]: 156764783

In [None]:
from pyspark.sql.functions import min

joined_df_pos = combined_df.withColumn("distance_time",col("WEATHER_TIMESTAMP").cast("long") - col("STATION_TIMESTAMP").cast("long")).where("distance_time >= 0")
joined_df_pos_min = joined_df_pos.groupBy("flight_id").agg(min("distance_time").alias('minimum_distance')).withColumnRenamed("flight_id", "flight_id_min")
joined_df_pos_min_final = joined_df_pos.join(joined_df_pos_min, (joined_df_pos_min.flight_id_min == joined_df_pos.flight_id) & (joined_df_pos_min.minimum_distance == joined_df_pos.distance_time), 'inner') # 3m 1396256 unique

joined_df_clean = joined_df_pos_min_final.dropDuplicates(["flight_id"])
joined_df_clean.write.mode("overwrite").parquet(f"{blob_url}/joined_df_full")

In [None]:
# Drop temporary columns
columns_to_drop = ['CODE_STATION_TIMESTAMP', 'distance_time', 'flight_id_min', 'minimum_distance', 'CODE_TIMESTAMP', 'TWO_CODE_TIMESTAMP', 'THREE_CODE_TIMESTAMP']

clean_full = spark.read.parquet(f"{blob_url}/joined_df_full") # 42227239
clean_full = clean_full.drop(*columns_to_drop)

# Rename the weather features so it's clear they belong to the ORIGIN airport
columns_to_rename = ['STATION_ID', 'STATION_NAME', 'STATION_ELEVATION', 'STATION_LATITUDE', 'STATION_LONGITUDE', 'WEATHER_DATE', 'WEATHER_REPORT_TYPE', 'WEATHER_SOURCE', 'HourlyPrecipitation', 'HourlyPresentWeatherType', 'STATION_TIMESTAMP', 'HourlyDewPointTemperature', 'HourlyDryBulbTemperature', 'HourlyWetBulbTemperature', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindSpeed', 'HourlyVisibility', 'AU_code', 'AW_code', 'MW_code', 'AU_TS', 'AU_ICE', 'AU_SNOW', 'AU_FOG', 'AW_TS', 'AW_ICE', 'AW_SNOW', 'AW_FOG', 'MW_TS', 'MW_ICE', 'MW_SNOW', 'MW_FOG', 'TS', 'ICE', 'SNOW', 'FOG', 'icao', 'iata', 'airport_name', 'airport_city', 'airport_subd', 'airport_country', 'elevation', 'airport_lat', 'airport_lon', 'airport_tz', 'FLIGHT_TIMESTAMP']


for column in columns_to_rename:
  clean_full = clean_full.withColumnRenamed(column, "ORIGIN_" + column)

clean_full.write.mode("overwrite").parquet(f"{blob_url}/joined_df_full")

In [None]:
clean_full = spark.read.parquet(f"{blob_url}/joined_df_full") # 42227239
origin_airports = clean_full.select("ORIGIN").distinct()
dest_airports = clean_full.select("DEST").distinct()

joined = dest_airports.join(origin_airports, origin_airports.ORIGIN == dest_airports.DEST, 'left').where("ORIGIN is null")
display(joined)

DEST,ORIGIN
PSE,
PPG,
OGS,
SJU,
GUM,
XWA,
FNL,


In [None]:
clean_full = spark.read.parquet(f"{blob_url}/joined_df_full") # 42227239
full_dest_tz = clean_full.join(df_airport_timezone, clean_full.DEST == df_airport_timezone.iata, 'left')

full_dest_tz = full_dest_tz.withColumn("CODE_TIMESTAMP", create_composite_key('icao', 'HOUR_WEATHER_TIMESTAMP')) \
  .withColumn("TWO_CODE_TIMESTAMP", create_composite_key('icao', 'TWO_HOUR_WEATHER_TIMESTAMP')) \
  .withColumn("THREE_CODE_TIMESTAMP", create_composite_key('icao', 'THREE_HOUR_WEATHER_TIMESTAMP'))

# Drop unnecessary columns
df_weather_icao_needed_tz = spark.read.parquet(f"{blob_url}/weather_with_icao_tz") # 1057832
columns_to_drop = ['airport_name', 'airport_city', 'airport_country', 'airport_tz', 'year', 'airport_subd', 'country', 'elevation', 'iata', 'airport_lon', 'airport_lat', 'icao', 'HOUR_TIMESTAMP']
df_weather_icao_needed_tz = df_weather_icao_needed_tz.drop(*columns_to_drop)

# Join the datasets
joined_df_one = full_dest_tz.join(df_weather_icao_needed_tz, full_dest_tz.CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')
joined_df_two = full_dest_tz.join(df_weather_icao_needed_tz, full_dest_tz.TWO_CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')
joined_df_three = full_dest_tz.join(df_weather_icao_needed_tz, full_dest_tz.THREE_CODE_TIMESTAMP == df_weather_icao_needed_tz.CODE_STATION_TIMESTAMP, 'left')

combined_df = joined_df_one.union(joined_df_two).union(joined_df_three) # 3m 6204054
combined_df.write.mode("overwrite").parquet(f"{blob_url}/combined_dest")


In [None]:
combined_df = spark.read.parquet(f"{blob_url}/combined_dest") # 184162262

from pyspark.sql.functions import min

joined_df_pos = combined_df.withColumn("distance_time",col("WEATHER_TIMESTAMP").cast("long") - col("STATION_TIMESTAMP").cast("long")).where("distance_time >= 0")
joined_df_pos_min = joined_df_pos.groupBy("flight_id").agg(min("distance_time").alias('minimum_distance')).withColumnRenamed("flight_id", "flight_id_min")
joined_df_pos_min_final = joined_df_pos.join(joined_df_pos_min, (joined_df_pos_min.flight_id_min == joined_df_pos.flight_id) & (joined_df_pos_min.minimum_distance == joined_df_pos.distance_time), 'inner') # 3m 1396256 unique

joined_df_clean = joined_df_pos_min_final.dropDuplicates(["flight_id"])
joined_df_clean.write.mode("overwrite").parquet(f"{blob_url}/joined_df_full_dest")

In [None]:
# Drop temporary columns
columns_to_drop = ['CODE_STATION_TIMESTAMP', 'distance_time', 'flight_id_min', 'minimum_distance', 'CODE_TIMESTAMP', 'TWO_CODE_TIMESTAMP', 'THREE_CODE_TIMESTAMP']

join_df_origin_dest = spark.read.parquet(f"{blob_url}/joined_df_full_dest") # 184162262
join_df_origin_dest = join_df_origin_dest.drop(*columns_to_drop)

# Rename the weather features so it's clear they belong to the ORIGIN airport
columns_to_rename = ['STATION_ID', 'STATION_NAME', 'STATION_ELEVATION', 'STATION_LATITUDE', 'STATION_LONGITUDE', 'WEATHER_DATE', 'WEATHER_REPORT_TYPE', 'WEATHER_SOURCE', 'HourlyPrecipitation', 'HourlyPresentWeatherType', 'STATION_TIMESTAMP', 'HourlyDewPointTemperature', 'HourlyDryBulbTemperature', 'HourlyWetBulbTemperature', 'HourlyStationPressure', 'HourlyWindDirection', 'HourlyWindSpeed', 'HourlyVisibility', 'AU_code', 'AW_code', 'MW_code', 'AU_TS', 'AU_ICE', 'AU_SNOW', 'AU_FOG', 'AW_TS', 'AW_ICE', 'AW_SNOW', 'AW_FOG', 'MW_TS', 'MW_ICE', 'MW_SNOW', 'MW_FOG', 'TS', 'ICE', 'SNOW', 'FOG', 'icao', 'iata', 'airport_name', 'airport_city', 'airport_subd', 'airport_country', 'elevation', 'airport_lat', 'airport_lon', 'airport_tz', 'FLIGHT_TIMESTAMP']


for column in columns_to_rename:
  join_df_origin_dest = join_df_origin_dest.withColumnRenamed(column, "DEST_" + column)

join_df_origin_dest.write.mode("overwrite").parquet(f"{blob_url}/joined_df_full_dest")

In [None]:
join_df_origin_dest = spark.read.parquet(f"{blob_url}/joined_df_full_dest") # 184162262
display(join_df_origin_dest)

QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER,TAIL_NUM,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_CITY_NAME,ORIGIN_STATE_ABR,ORIGIN_STATE_FIPS,ORIGIN_STATE_NM,ORIGIN_WAC,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,DEST_CITY_NAME,DEST_STATE_ABR,DEST_STATE_FIPS,DEST_STATE_NM,DEST_WAC,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,DEP_DELAY_NEW,DEP_DEL15,DEP_DELAY_GROUP,DEP_TIME_BLK,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,ARR_DELAY_NEW,ARR_DEL15,ARR_DELAY_GROUP,ARR_TIME_BLK,CANCELLED,CANCELLATION_CODE,DIVERTED,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,FIRST_DEP_TIME,TOTAL_ADD_GTIME,LONGEST_ADD_GTIME,DIV_AIRPORT_LANDINGS,DIV_REACHED_DEST,DIV_ACTUAL_ELAPSED_TIME,DIV_ARR_DELAY,DIV_DISTANCE,DIV1_AIRPORT,DIV1_AIRPORT_ID,DIV1_AIRPORT_SEQ_ID,DIV1_WHEELS_ON,DIV1_TOTAL_GTIME,DIV1_LONGEST_GTIME,DIV1_WHEELS_OFF,DIV1_TAIL_NUM,DIV2_AIRPORT,DIV2_AIRPORT_ID,DIV2_AIRPORT_SEQ_ID,DIV2_WHEELS_ON,DIV2_TOTAL_GTIME,DIV2_LONGEST_GTIME,DIV2_WHEELS_OFF,DIV2_TAIL_NUM,DIV3_AIRPORT,DIV3_AIRPORT_ID,DIV3_AIRPORT_SEQ_ID,DIV3_WHEELS_ON,DIV3_TOTAL_GTIME,DIV3_LONGEST_GTIME,DIV3_WHEELS_OFF,DIV3_TAIL_NUM,DIV4_AIRPORT,DIV4_AIRPORT_ID,DIV4_AIRPORT_SEQ_ID,DIV4_WHEELS_ON,DIV4_TOTAL_GTIME,DIV4_LONGEST_GTIME,DIV4_WHEELS_OFF,DIV4_TAIL_NUM,DIV5_AIRPORT,DIV5_AIRPORT_ID,DIV5_AIRPORT_SEQ_ID,DIV5_WHEELS_ON,DIV5_TOTAL_GTIME,DIV5_LONGEST_GTIME,DIV5_WHEELS_OFF,DIV5_TAIL_NUM,YEAR,ORIGIN_icao,ORIGIN_iata,ORIGIN_airport_name,ORIGIN_airport_city,ORIGIN_airport_subd,ORIGIN_airport_country,ORIGIN_elevation,ORIGIN_airport_lat,ORIGIN_airport_lon,ORIGIN_airport_tz,flight_id,ORIGIN_FLIGHT_TIMESTAMP,HOUR_WEATHER_TIMESTAMP,WEATHER_TIMESTAMP,TWO_HOUR_WEATHER_TIMESTAMP,THREE_HOUR_WEATHER_TIMESTAMP,ORIGIN_STATION_ID,ORIGIN_STATION_NAME,ORIGIN_STATION_ELEVATION,ORIGIN_STATION_LATITUDE,ORIGIN_STATION_LONGITUDE,ORIGIN_WEATHER_DATE,ORIGIN_WEATHER_REPORT_TYPE,ORIGIN_WEATHER_SOURCE,ORIGIN_HourlyPrecipitation,ORIGIN_HourlyPresentWeatherType,ORIGIN_STATION_TIMESTAMP,HOUR_TIMESTAMP,ORIGIN_HourlyDewPointTemperature,ORIGIN_HourlyDryBulbTemperature,ORIGIN_HourlyWetBulbTemperature,ORIGIN_HourlyStationPressure,ORIGIN_HourlyWindDirection,ORIGIN_HourlyWindSpeed,ORIGIN_HourlyVisibility,ORIGIN_AU_code,ORIGIN_AW_code,ORIGIN_MW_code,ORIGIN_AU_TS,ORIGIN_AU_ICE,ORIGIN_AU_SNOW,ORIGIN_AU_FOG,ORIGIN_AW_TS,ORIGIN_AW_ICE,ORIGIN_AW_SNOW,ORIGIN_AW_FOG,ORIGIN_MW_TS,ORIGIN_MW_ICE,ORIGIN_MW_SNOW,ORIGIN_MW_FOG,ORIGIN_TS,ORIGIN_ICE,ORIGIN_SNOW,ORIGIN_FOG,DEST_icao,DEST_iata,DEST_airport_name,DEST_airport_city,DEST_airport_subd,DEST_airport_country,DEST_elevation,DEST_airport_lat,DEST_airport_lon,DEST_airport_tz,DEST_STATION_ID,DEST_STATION_NAME,DEST_STATION_ELEVATION,DEST_STATION_LATITUDE,DEST_STATION_LONGITUDE,DEST_WEATHER_DATE,DEST_WEATHER_REPORT_TYPE,DEST_WEATHER_SOURCE,DEST_HourlyPrecipitation,DEST_HourlyPresentWeatherType,DEST_STATION_TIMESTAMP,DEST_HourlyDewPointTemperature,DEST_HourlyDryBulbTemperature,DEST_HourlyWetBulbTemperature,DEST_HourlyStationPressure,DEST_HourlyWindDirection,DEST_HourlyWindSpeed,DEST_HourlyVisibility,DEST_AU_code,DEST_AW_code,DEST_MW_code,DEST_AU_TS,DEST_AU_ICE,DEST_AU_SNOW,DEST_AU_FOG,DEST_AW_TS,DEST_AW_ICE,DEST_AW_SNOW,DEST_AW_FOG,DEST_MW_TS,DEST_MW_ICE,DEST_MW_SNOW,DEST_MW_FOG,DEST_TS,DEST_ICE,DEST_SNOW,DEST_FOG
4,12,4,1,2017-12-04,UA,19977,UA,N77537,1688,14771,1477104,32457,SFO,"San Francisco, CA",CA,6,California,91,14747,1474703,30559,SEA,"Seattle, WA",WA,53,Washington,93,1715,1708.0,-7.0,0.0,0.0,-1.0,1700-1759,17.0,1725.0,1908.0,8.0,1927,1916.0,-11.0,0.0,0.0,-1.0,1900-1959,0.0,,0.0,132.0,128.0,103.0,1.0,679.0,3,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2017,KSFO,SFO,San Francisco International Airport,San Francisco,California,US,13.0,37.6189994812,-122.375,America/Los_Angeles,499,2017-12-05T01:15:00.000+0000,2017120423,2017-12-04T23:15:00.000+0000,2017120422,2017120421,72494023234,"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",2.4,37.6197,-122.3647,2017-12-04T14:56:00,FM-15,7,0.0,,2017-12-04T22:56:00.000+0000,2017120422,34.0,61.0,48.0,30.08,30.0,25.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KSEA,SEA,Seattle Tacoma International Airport,Seattle,Washington,US,433.0,47.4490013123,-122.3089981079,America/Los_Angeles,72793024233,"SEATTLE TACOMA AIRPORT, WA US",112.8,47.4444,-122.3138,2017-12-04T14:53:00,FM-15,7,0.0,,2017-12-04T22:53:00.000+0000,37.0,42.0,40.0,30.1,360.0,5.0,7.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,11,9,1,2020-11-09 00:00:00,WN,19393,WN,N8318F,1292,13830,1383002,33830,OGG,"Kahului, HI",HI,15,Hawaii,2,14831,1483106,32457,SJC,"San Jose, CA",CA,6,California,91,1455,1447.0,-8.0,0.0,0.0,-1.0,1400-1459,11.0,1458.0,2153.0,3.0,2150,2156.0,6.0,6.0,0.0,0.0,2100-2159,0.0,,0.0,295.0,309.0,295.0,1.0,2355.0,10,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2020,PHOG,OGG,Kahului Airport,Kahului,Hawaii,US,54.0,20.8985996246,-156.4299926758,Pacific/Honolulu,1473,2020-11-10T00:55:00.000+0000,2020110922,2020-11-09T22:55:00.000+0000,2020110921,2020110920,91190022516,"KAHULUI AIRPORT, HI US",15.5,20.89972,-156.42861,2020-11-09T12:54:00,FM-15,7,0.0,,2020-11-09T22:54:00.000+0000,2020110922,65.0,91.0,74.0,29.9,90.0,28.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KSJC,SJC,Norman Y. Mineta San Jose International Airport,San Jose,California,US,62.0,37.3625984192,-121.9290008545,America/Los_Angeles,72494523293,"SAN JOSE INTERNATIONAL AIRPORT, CA US",15.5,37.3591,-121.924,2020-11-09T14:53:00,FM-15,7,0.0,,2020-11-09T22:53:00.000+0000,19.0,59.0,43.0,30.13,330.0,6.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,1,1,5,2021-01-01 00:00:00,OO,20304,OO,N761ND,3239,14107,1410702,30466,PHX,"Phoenix, AZ",AZ,4,Arizona,81,11413,1141307,30285,DRO,"Durango, CO",CO,8,Colorado,82,2010,2002.0,-8.0,0.0,0.0,-1.0,2000-2059,18.0,2020.0,2116.0,3.0,2139,2119.0,-20.0,0.0,0.0,-2.0,2100-2159,0.0,,0.0,89.0,77.0,56.0,1.0,351.0,2,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2021,KPHX,PHX,Phoenix Sky Harbor International Airport,Phoenix,Arizona,US,1135.0,33.434299469,-112.0120010376,America/Phoenix,1716,2021-01-02T03:10:00.000+0000,2021010201,2021-01-02T01:10:00.000+0000,2021010200,2021010123,72278023183,"PHOENIX AIRPORT, AZ US",339.2,33.4278,-112.00365,2021-01-01T17:51:00,FM-15,7,0.0,,2021-01-02T00:51:00.000+0000,2021010200,21.0,60.0,44.0,28.92,250.0,5.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KDRO,DRO,Durango La Plata County Airport,Durango,Colorado,US,6685.0,37.1515007019,-107.753997803,America/Denver,72462593005,"DURANGO LA PLATA CO AIRPORT, CO US",2020.6,37.1431,-107.76023,2021-01-01T17:53:00,FM-15,7,0.0,,2021-01-02T00:53:00.000+0000,19.0,28.0,25.0,23.54,10.0,6.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,1,26,2,2021-01-26 00:00:00,QX,19687,QX,N639QX,2174,11884,1188402,31884,GEG,"Spokane, WA",WA,53,Washington,93,14831,1483106,32457,SJC,"San Jose, CA",CA,6,California,91,730,723.0,-7.0,0.0,0.0,-1.0,0700-0759,29.0,752.0,944.0,3.0,945,947.0,2.0,2.0,0.0,0.0,0900-0959,0.0,,0.0,135.0,144.0,112.0,1.0,742.0,3,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2021,KGEG,GEG,Spokane International Airport,Spokane,Washington,US,2376.0,47.6198997498,-117.533996582,America/Los_Angeles,1783,2021-01-26T15:30:00.000+0000,2021012613,2021-01-26T13:30:00.000+0000,2021012612,2021012611,72785024157,"SPOKANE INTERNATIONAL AIRPORT, WA US",717.7,47.62168,-117.52796,2021-01-26T04:53:00,FM-15,7,0.0,-SN:03 BR:1 |SN |,2021-01-26T12:53:00.000+0000,2021012612,24.0,27.0,26.0,27.31,210.0,3.0,3.0,-SN:03 BR:1,SN,,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,KSJC,SJC,Norman Y. Mineta San Jose International Airport,San Jose,California,US,62.0,37.3625984192,-121.9290008545,America/Los_Angeles,72494523293,"SAN JOSE, CA US",15.0,37.35938,-121.92444,2021-01-26T04:53:00,FM-15,7,0.0,,2021-01-26T12:53:00.000+0000,30.0,34.0,32.0,29.89,150.0,3.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,11,15,7,2015-11-15,WN,19393,WN,N7704B,2821,13342,1334205,33342,MKE,"Milwaukee, WI",WI,55,Wisconsin,45,13204,1320402,31454,MCO,"Orlando, FL",FL,12,Florida,33,2115,2114.0,-1.0,0.0,0.0,-1.0,2100-2159,9.0,2123.0,39.0,5.0,105,44.0,-21.0,0.0,0.0,-2.0,0001-0559,0.0,,0.0,170.0,150.0,136.0,1.0,1066.0,5,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2015,KMKE,MKE,General Mitchell International Airport,Milwaukee,Wisconsin,US,723.0,42.9472007751,-87.8965988159,America/Chicago,2556,2015-11-16T03:15:00.000+0000,2015111601,2015-11-16T01:15:00.000+0000,2015111600,2015111523,72640014839,"MILWAUKEE MITCHELL AIRPORT, WI US",203.3,42.95489,-87.90457,2015-11-15T18:52:00,FM-15,7,0.0,,2015-11-16T00:52:00.000+0000,2015111600,34.0,51.0,43.0,29.39,210.0,7.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KMCO,MCO,Orlando International Airport,Orlando,Florida,US,96.0,28.4293994904,-81.3089981079,America/New_York,72205012815,"ORLANDO INTERNATIONAL AIRPORT, FL US",27.4,28.4339,-81.325,2015-11-15T19:53:00,FM-15,7,0.0,,2015-11-16T00:53:00.000+0000,61.0,70.0,65.0,30.13,20.0,8.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,1,16,5,2015-01-16,UA,19977,UA,N512UA,502,14771,1477101,32457,SFO,"San Francisco, CA",CA,6,California,91,12478,1247802,31703,JFK,"New York, NY",NY,36,New York,22,1646,1645.0,-1.0,0.0,0.0,-1.0,1600-1659,22.0,1707.0,58.0,6.0,114,104.0,-10.0,0.0,0.0,-1.0,0001-0559,0.0,,0.0,328.0,319.0,291.0,1.0,2586.0,11,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2015,KSFO,SFO,San Francisco International Airport,San Francisco,California,US,13.0,37.6189994812,-122.375,America/Los_Angeles,2825,2015-01-17T00:46:00.000+0000,2015011622,2015-01-16T22:46:00.000+0000,2015011621,2015011620,72494023234,"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",2.4,37.6197,-122.3647,2015-01-16T13:56:00,FM-15,7,0.0,BR:1 ||,2015-01-16T21:56:00.000+0000,2015011621,53.0,56.0,54.0,30.2,40.0,3.0,4.0,BR:1,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KJFK,JFK,John F Kennedy International Airport,New York,New York,US,13.0,40.63980103,-73.77890015,America/New_York,74486094789,"JFK INTERNATIONAL AIRPORT, NY US",3.4,40.63915,-73.76401,2015-01-16T16:51:00,FM-15,7,0.0,,2015-01-16T21:51:00.000+0000,13.0,34.0,27.0,29.93,310.0,31.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,3,15,1,2021-03-15 00:00:00,DL,19790,DL,N356DN,1151,13487,1348702,31650,MSP,"Minneapolis, MN",MN,27,Minnesota,63,15304,1530402,33195,TPA,"Tampa, FL",FL,12,Florida,33,1855,1852.0,-3.0,0.0,0.0,-1.0,1800-1859,97.0,2029.0,8.0,5.0,2258,13.0,75.0,75.0,1.0,5.0,2200-2259,0.0,,0.0,183.0,261.0,159.0,1.0,1306.0,6,0.0,0.0,75.0,0.0,0.0,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2021,KMSP,MSP,Minneapolis-St Paul International/Wold-Chamberlain Airport,Minneapolis,Minnesota,US,841.0,44.8819999695,-93.2218017578,America/Chicago,3139,2021-03-15T23:55:00.000+0000,2021031521,2021-03-15T21:55:00.000+0000,2021031520,2021031519,72658014922,"MINNEAPOLIS ST. PAUL INTERNATIONAL AIRPORT, MN US",254.5,44.88523,-93.23133,2021-03-15T15:53:00,FM-15,7,0.06,-SN:03 BR:1 |SN |,2021-03-15T21:53:00.000+0000,2021031521,27.0,29.0,28.0,29.15,80.0,15.0,1.5,-SN:03 BR:1,SN,,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,KTPA,TPA,Tampa International Airport,Tampa,Florida,US,26.0,27.9755001068,-82.533203125,America/New_York,72211012842,"TAMPA INTERNATIONAL AIRPORT, FL US",1.8,27.96331,-82.54,2021-03-15T16:53:00,FM-15,7,0.0,,2021-03-15T21:53:00.000+0000,62.0,80.0,69.0,30.07,250.0,6.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,11,25,3,2020-11-25 00:00:00,OO,20304,OO,N713SK,3170,13930,1393007,30977,ORD,"Chicago, IL",IL,17,Illinois,41,10372,1037205,30372,ASE,"Aspen, CO",CO,8,Colorado,82,1005,1024.0,19.0,19.0,1.0,1.0,1000-1059,16.0,1040.0,1203.0,2.0,1201,1205.0,4.0,4.0,0.0,0.0,1200-1259,0.0,,0.0,176.0,161.0,143.0,1.0,1013.0,5,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2020,KORD,ORD,Chicago O'Hare International Airport,Chicago,Illinois,US,672.0,41.97859955,-87.90480042,America/Chicago,4098,2020-11-25T16:05:00.000+0000,2020112514,2020-11-25T14:05:00.000+0000,2020112513,2020112512,72530094846,"CHICAGO OHARE INTERNATIONAL AIRPORT, IL US",201.8,41.96019,-87.93162,2020-11-25T07:51:00,FM-15,7,0.01,-DZ:01 BR:1 |DZ |DZ,2020-11-25T13:51:00.000+0000,2020112513,43.0,45.0,44.0,29.1,120.0,6.0,1.0,-DZ:01 BR:1,DZ,DZ,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KASE,ASE,Aspen-Pitkin Co/Sardy Field,Aspen,Colorado,US,7820.0,39.22320175,-106.8690033,America/Denver,72467693073,"ASPEN PITKIN CO AIRPORT SARDY FIELD, CO US",2353.1,39.23,-106.87056,2020-11-25T06:53:00,FM-15,7,0.0,,2020-11-25T13:53:00.000+0000,5.0,11.0,9.0,22.59,190.0,6.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,8,27,7,2017-08-27,HA,19690,HA,N391HA,34,13830,1383002,33830,OGG,"Kahului, HI",HI,15,Hawaii,2,12892,1289206,32575,LAX,"Los Angeles, CA",CA,6,California,91,1340,1336.0,-4.0,0.0,0.0,-1.0,1300-1359,10.0,1346.0,2129.0,13.0,2155,2142.0,-13.0,0.0,0.0,-1.0,2100-2159,0.0,,0.0,315.0,306.0,283.0,1.0,2486.0,10,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2017,PHOG,OGG,Kahului Airport,Kahului,Hawaii,US,54.0,20.8985996246,-156.4299926758,Pacific/Honolulu,4939,2017-08-27T23:40:00.000+0000,2017082721,2017-08-27T21:40:00.000+0000,2017082720,2017082719,91190022516,"KAHULUI AIRPORT, HI US",15.5,20.89972,-156.42861,2017-08-27T10:54:00,FM-15,7,0.0,,2017-08-27T20:54:00.000+0000,2017082720,70.0,86.0,75.0,29.96,40.0,22.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KLAX,LAX,Los Angeles International Airport,Los Angeles,California,US,125.0,33.94250107,-118.4079971,America/Los_Angeles,72295023174,"LOS ANGELES INTERNATIONAL AIRPORT, CA US",29.6,33.938,-118.3888,2017-08-27T12:53:00,FM-15,7,0.0,,2017-08-27T20:53:00.000+0000,65.0,71.0,67.0,29.55,250.0,15.0,8.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,8,20,7,2017-08-20,UA,19977,UA,N37281,1279,11298,1129804,30194,DFW,"Dallas/Fort Worth, TX",TX,48,Texas,74,12264,1226402,30852,IAD,"Washington, DC",VA,51,Virginia,38,1140,1138.0,-2.0,0.0,0.0,-1.0,1100-1159,13.0,1151.0,1519.0,9.0,1546,1528.0,-18.0,0.0,0.0,-2.0,1500-1559,0.0,,0.0,186.0,170.0,148.0,1.0,1172.0,5,,,,,,,,,0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2017,KDFW,DFW,Dallas Fort Worth International Airport,Dallas-Fort Worth,Texas,US,607.0,32.8968009949,-97.0380020142,America/Chicago,5052,2017-08-20T16:40:00.000+0000,2017082014,2017-08-20T14:40:00.000+0000,2017082013,2017082012,72259003927,"DAL FTW WSCMO AIRPORT, TX US",170.7,32.8978,-97.0189,2017-08-20T07:53:00,FM-15,7,0.0,,2017-08-20T13:53:00.000+0000,2017082013,70.0,85.0,75.0,29.35,190.0,7.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KIAD,IAD,Washington Dulles International Airport,Dulles,Virginia,US,312.0,38.94449997,-77.45580292,America/New_York,72403093738,"WASHINGTON DULLES INTERNATIONAL AIRPORT, VA US",88.4,38.93486,-77.44728,2017-08-20T08:52:00,FM-15,7,0.0,,2017-08-20T13:52:00.000+0000,63.0,76.0,68.0,29.79,20.0,6.0,10.0,,,,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [None]:
df_airlines.where("ORIGIN in ('PSE', 'PPG', 'OGS', 'SPN', 'SJU', 'GUM', 'XWA')").count()

Out[16]: 193037

In [None]:
from pyspark.sql.functions import countDistinct
import numpy as np
from math import radians, cos, sin, asin, sqrt

# Find missing airports

matched_airpoorts = clean_full.select("ORIGIN").withColumnRenamed("ORIGIN", "MATCHED_ORIGIN").distinct()

original_airports = df_airlines.select("ORIGIN").distinct()

missing_airports = original_airports.join(matched_airpoorts, original_airports.ORIGIN == matched_airpoorts.MATCHED_ORIGIN, "left").where("MATCHED_ORIGIN is NULL")

# Find neirest stations to each airport
just_stations = df_stations.select("station_id", "lon", "lat").distinct()

# Get airport data for missing airports
missing_airports_coords = missing_airports.join(df_airport_timezone, missing_airports.ORIGIN == df_airport_timezone.iata, "left")

def haversine_distance(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
 
    d_lon = lon2 - lon1 
    d_lat = lat2 - lat1 
    a = sin(d_lat/2)**2 + cos(lat1) * cos(lat2) * sin(d_lon/2)**2
    c = 2 * asin(sqrt(a)) 
    rkm = 6371
    rm = 3956
    return c * rkm, c * rm

stations_sc = sc.broadcast(just_stations.rdd.collect())
    
def calc_distances(airport):
  airport_lon = float(airport['airport_lon'])
  airport_lat = float(airport['airport_lat'])

  for station in stations_sc.value:
    if not station['lon'] or not station['lat']:
        continue
    station_lon = float(station['lon'])
    station_lat = float(station['lat'])
    station_id = station['station_id']
    yield (airport['iata'], (station_id, haversine_distance(airport_lon, airport_lat, station_lon, station_lat)))

def find_minimum(a, b):
  minimum_index = np.argmin([a[1][0], b[1][0]])
  if minimum_index == 0:
      return a
  else:
      return b
    
def map_values(a):
  return (a[0], a[1][0], a[1][1][0], a[1][1][1])
result = missing_airports_coords.rdd.flatMap(calc_distances).reduceByKey(find_minimum).map(map_values).collect()
new_stations = spark.createDataFrame(result, ["IATA", "CLOSEST_STATION_ID", "DISTANCE in KM", "DISTANCE in Miles"])
new_stations.show()

+----+------------------+------------------+------------------+
|IATA|CLOSEST_STATION_ID|    DISTANCE in KM| DISTANCE in Miles|
+----+------------------+------------------+------------------+
| GUM|       70414045715| 5074.661619495768| 3151.053424380044|
| XWA|       72767094014|12.720358338679711| 7.898561856508701|
| PPG|       91197521510|4113.6394938583635| 2554.317664056457|
| SPN|       70414045715| 4867.467580900914| 3022.398642292264|
| OGS|       72622394725| 56.45093250281826|35.052564586587515|
| SJU|       78526500494|10.334183951503903| 6.416894005987983|
| PSE|       78514511653| 67.68333842958074| 42.02719931367468|
+----+------------------+------------------+------------------+

