In [0]:
# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container  = "final-project-summer24-team3"       # The name of your container created in https://portal.azure.com
storage_account = "summer2024team3"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "summer24_team_3_2_scope"           # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "final-project-summer24-team3"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url   = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket

# the 261 course blob storage is mounted here.
mids261_mount_path      = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

# see what's in the blob storage root folder 
display(dbutils.fs.ls(f"{team_blob_url}"))

path,name,size,modificationTime
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/,TP/,0,1722323705000


In [0]:
from pyspark.sql.functions import col, isnan, count, when, split, concat, lit, min, row_number, lower, lpad, udf, first, countDistinct, coalesce, to_timestamp, monotonically_increasing_id, explode, array
from pyspark.sql.types import StringType, StructField, StructType, FloatType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from datetime import datetime, timedelta

In [0]:
merged_3m = spark.read.parquet("wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/flight_stations_weather_3m_cleaned")
merged_1y = spark.read.parquet("wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/flight_stations_weather_1y_cleaned")
merged_all = spark.read.parquet("wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/flight_stations_weather_all_cleaned")

In [0]:
df = merged_3m

In [0]:
# Julia cell to input delay departure and arrivals 

def create_delay_percentage_features(df):
    """
    Input: dataframe to which features to be added. Requires columns: "CRS_DEP_TIME", "FL_DATE", "DEP_DEL15", "ARR_DEL15", "ORIGIN_AIRPORT_ID", "DEST_AIRPORT_ID"
    Output: returns dataframe with 6 new columns for departure delay % and arrival delay % for 3 time windows
    """

    # compute unix timestring
    def update_df_unixtime(df, timestring, datestring):
        """
        Inputs: dataframe, CRS_DEP_TIME, FL_DATE
        Outputs: dataframe with unix timestring
        """

        # Convert the military time to a time string
        df = df.withColumn("MOD_DEP_TIME", F.lpad(F.col(str(timestring)).cast("string"), 4, "0"))
        df = df.withColumn("MOD_DEP_TIME", F.concat_ws(":", df.MOD_DEP_TIME.substr(1, 2), df.MOD_DEP_TIME.substr(3, 2)))

        # Combine the date column with the time string to create a datetime string
        df = df.withColumn("MOD_DEP_DATE_TIME", F.concat_ws(" ", F.col(str(datestring)).cast("string").substr(0, 10), F.col("MOD_DEP_TIME")))

        # maybe can experiment without casting and keeping it as a UNIX timestamp?
        # Convert the datetime string to a UNIX timestamp
        df = df.withColumn("DEP_TIMESTAMP_UNIX", F.unix_timestamp(F.col("MOD_DEP_DATE_TIME"), "yyyy-MM-dd HH:mm"))

        df = df.drop("MOD_DEP_TIME") \
            .drop("MOD_DEP_DATE_TIME")

        return df

    # add unix timestring column to dataframe
    df = update_df_unixtime(df, "CRS_DEP_TIME", "FL_DATE")

    # create new departure delay % features
    window_spec_12hr = Window.partitionBy("ORIGIN_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(12 * 3600), Window.currentRow-(2 * 3600))

    window_spec_6hr = Window.partitionBy("ORIGIN_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(6 * 3600), Window.currentRow-(2 * 3600))

    window_spec_4hr = Window.partitionBy("ORIGIN_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(4 * 3600), Window.currentRow-(2 * 3600))

    # Compute % of delayed flights within the window
    df = df.withColumn("DEP_12hr", F.count(F.when(F.col("DEP_DEL15") == 1, 1)).over(window_spec_12hr)/F.count("*").over(window_spec_12hr))
    df = df.withColumn("DEP_6hr", F.count(F.when(F.col("DEP_DEL15") == 1, 1)).over(window_spec_6hr)/F.count("*").over(window_spec_6hr))
    df = df.withColumn("DEP_4hr", F.count(F.when(F.col("DEP_DEL15") == 1, 1)).over(window_spec_4hr)/F.count("*").over(window_spec_4hr))

    # create new arrival delay % features
    window_spec_12hrARR = Window.partitionBy("DEST_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(12 * 3600), Window.currentRow-(2 * 3600))

    window_spec_6hrARR = Window.partitionBy("DEST_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(6 * 3600), Window.currentRow-(2 * 3600))

    window_spec_4hrARR = Window.partitionBy("DEST_AIRPORT_ID").orderBy("DEP_TIMESTAMP_UNIX").rangeBetween(Window.currentRow-(4 * 3600), Window.currentRow-(2 * 3600))


    # Compute the percentage of the number of delayed arriving flights within the window
    df = df.withColumn("ARR_12hr", F.count(F.when(F.col("ARR_DEL15") == 1, 1)).over(window_spec_12hrARR)/F.count("*").over(window_spec_12hrARR))
    df = df.withColumn("ARR_6hr", F.count(F.when(F.col("ARR_DEL15") == 1, 1)).over(window_spec_6hrARR)/F.count("*").over(window_spec_6hrARR))
    df = df.withColumn("ARR_4hr", F.count(F.when(F.col("ARR_DEL15") == 1, 1)).over(window_spec_4hrARR)/F.count("*").over(window_spec_4hrARR)).drop("DEP_TIMESTAMP_UNIX")

    # Fill in null values as 0 -- results from division by 0 for flights where there are no flights counted in the window
    df = df.na.fill({'DEP_12hr': 0, 'DEP_6hr': 0, 'DEP_4hr': 0, 'ARR_12hr': 0, 'ARR_6hr': 0, 'ARR_4hr': 0})

    return df




In [0]:
# to add in Julia's features (Jordan, please check which df this is supposed to get applied to!)
df_delay = create_delay_percentage_features(df)

In [0]:
# Ian's cell to create columns for airplane delays 

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, concat_ws, lpad, expr, unix_timestamp, from_unixtime, when, count, lag, greatest
import pyspark.sql.functions as F
from datetime import datetime, timedelta

def flight_lag_transformation(df):

    """
    Input: dataframe to which features to be added. Requires columns: "CRS_DEP_TIME", "FL_DATE", "DEP_DEL15", "CANCELLED", "DISTANCE", "TAIL_NUM", "DEP_DELAY"
    Output: returns dataframe with 4 main columns 
    "DISTANCE_LAG": Distance of previous journey per plane 
    "REALIZED_DELAY_MIN": Known delay in minutes of previous journey of plane at leakage time barrier
    "REALIZED_DEL15": 1 if "REALIZED_DELAY_MIN" > 15 minutes and 0 if less
    "CRS_DEPART_DIFF": Difference in minutes between current and previous departure time in minutes
    """

    df = df.withColumn("DEP_DEL15", F.greatest(col("DEP_DEL15"), col("CANCELLED")))

    # Convert the military time to a time string
    df = df.withColumn("MOD_DEP_TIME", lpad(col("CRS_DEP_TIME").cast("string"), 4, "0"))
    df = df.withColumn("MOD_DEP_TIME", concat_ws(":", df.MOD_DEP_TIME.substr(1, 2), df.MOD_DEP_TIME.substr(3, 2)))

    # Combine the date column with the time string to create a datetime string
    df = df.withColumn("MOD_DATE_TIME", concat_ws(" ", col("FL_DATE").cast("string").substr(0, 10),  col("MOD_DEP_TIME")))

    # Convert the datetime string to a timestamp
    df = df.withColumn("MOD_TIMESTAMP", unix_timestamp(col("MOD_DATE_TIME"), "yyyy-MM-dd HH:mm").cast("timestamp"))

    df = df.withColumn("DEL_TRUE_TS", (unix_timestamp(col("MOD_TIMESTAMP"),"yyyy-MM-dd HH:mm") + (col("DEP_DELAY") * 60)).cast("timestamp"))
    df = df.withColumn("LEAKAGE_TS", col("MOD_TIMESTAMP") - F.expr("INTERVAL 2 HOURS"))

    ## all lagged values
    window_spec = Window.partitionBy("TAIL_NUM").orderBy("MOD_TIMESTAMP")
    df = df.withColumn("DEP_DEL15_LAG", lag("DEP_DEL15", 1).over(window_spec))
    df = df.withColumn("DISTANCE_LAG", lag("DISTANCE", 1).over(window_spec))
    df = df.withColumn("MOD_TIMESTAMP_LAG", lag("MOD_TIMESTAMP", 1).over(window_spec))
    df = df.withColumn("DEL_TRUE_TS_LAG", lag("DEL_TRUE_TS", 1).over(window_spec))

    
    df = df.withColumn("CRS_DEPART_DIFF",F.round((unix_timestamp(col("MOD_TIMESTAMP")) - unix_timestamp(col("MOD_TIMESTAMP_LAG"))) / 60.0))
    df = df.withColumn("REALIZED_DEL_TS", F.least(col("DEL_TRUE_TS_LAG"),col("LEAKAGE_TS")))
    df = df.withColumn("REALIZED_DELAY_MIN", (unix_timestamp(col("REALIZED_DEL_TS")) - unix_timestamp(col("MOD_TIMESTAMP_LAG")))/60.0)

    def realized_del15(leak_time, past_dep_time, past_DEP_DEL15):

        if past_dep_time is None or past_DEP_DEL15 is None:
            return 0
        elif leak_time < past_dep_time + timedelta(minutes=15):
            return 0
        elif past_DEP_DEL15 == 1:        
            return 1
        else:
            return 0
        
    realized_del15_udf = F.udf(realized_del15, IntegerType())

    df = df.withColumn("REALIZED_DEL15", realized_del15_udf(col("LEAKAGE_TS"),col("MOD_TIMESTAMP_LAG"), col("DEP_DEL15_LAG")))

    df = df.drop("MOD_DEP_TIME", "MOD_DATE_TIME", "MOD_TIMESTAMP", "DEL_TRUE_TS", "LEAKAGE_TS", "DEP_DEL15_LAG", "MOD_TIMESTAMP_LAG", "DEL_TRUE_TS_LAG", 
                 "CRS_DEPART_DIFF", "REALIZED_DEL_TS")
    return df #df.select("DISTANCE_LAG","REALIZED_DELAY_MIN","REALIZED_DEL15")

## Since this revolves around lagged data the first flight on each plane does not have any values
## Suggestions DISTANCE_LAG: average of that plane (Tail Numbers) future distances (1 month in the future?) because that is know ahead of time
## REALIZED_DELAY_MIN: average from that airport over the past day?

In [0]:
df_delay_lag = flight_lag_transformation(df_delay)

In [0]:
#Sort by expected departure date and time 
df_delay_lag.write.parquet(f"{team_blob_url}/TP/flight_stations_weather_3m_cleaned2")
display(dbutils.fs.ls(f"{team_blob_url}/TP"))

path,name,size,modificationTime
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_SUCCESS,_SUCCESS,0,1720561571000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_committed_1002670972388615845,_committed_1002670972388615845,625,1720560531000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_committed_5669257934384103852,_committed_5669257934384103852,221,1720561571000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_committed_6618439955609603938,_committed_6618439955609603938,419,1720561337000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_committed_9167039456723159873,_committed_9167039456723159873,318,1720559468000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_committed_vacuum825530481471543349,_committed_vacuum825530481471543349,95,1720561338000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_started_1002670972388615845,_started_1002670972388615845,0,1720560531000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_started_5669257934384103852,_started_5669257934384103852,0,1720561570000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/_started_6618439955609603938,_started_6618439955609603938,0,1720561337000
wasbs://final-project-summer24-team3@summer2024team3.blob.core.windows.net/TP/df_1y_cleaned_transformed/,df_1y_cleaned_transformed/,0,1722046865000
