In [None]:
import pandas as pd
import numpy as np
import boto3
import dask.dataframe as dd
from dask.distributed import Client

In [None]:
client = Client(n_workers = 2, threads_per_worker = 1, memory_limit = "4GB")

In [None]:
#returns s3 client
def get_s3_client():
    return boto3.client('s3')

#lists all objects in an s3 bucket
def list_s3_objects(bucket_name, prefix=None):
    s3 = get_s3_client()
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    return response.get('Contents', [])

#downloads files from s3 bucket 
def download_s3_object(bucket_name, object_key, download_path):
    s3 = get_s3_client()
    s3.download_file(bucket_name, object_key, download_path)
    print(f"Downloaded {object_key} to {download_path}")

#reads content of file from s3 bucket into memory
def read_s3_object(bucket_name, object_key):
    s3 = get_s3_client()
    response = s3.get_object(Bucket=bucket_name, Key=object_key)
    return response['Body'].read()


def list_s3_objects(bucket_name, prefix=""):
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    return response.get('Contents', [])

bucket_name = "historicalflightdata"
folder_name = "raw_hist_flight_data/"

s3_client = boto3.client('s3') 

In [None]:
dtypes = {
    'ARR_TIME': 'float64',
    'CANCELLATION_CODE': 'object',  # Keep this as 'object' since it contains mixed types
    'DEP_TIME': 'float64'
}

# Initialize the S3 client
s3_client = boto3.client('s3')

bucket_name = "historicalflightdata"
folder_name = "raw_hist_flight_data/"  # Specify your folder here, make sure it's a valid string

# Function to list objects in S3 bucket
def list_s3_objects(bucket_name, prefix=""):
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    return response.get('Contents', [])

# List the files in the folder
response = list_s3_objects(bucket_name, folder_name)

# List to store the file paths
file_keys = [obj.get('Key') for obj in response if obj.get('Key').endswith('.csv')]

# Define the S3 URL path
s3_paths = [f"s3://{bucket_name}/{file_key}" for file_key in file_keys]

# Use Dask to read the parquet files from S3 in parallel
dfs = [dd.read_csv(path , dtype=dtypes) for path in s3_paths]


In [None]:
combined_df_dask = dd.concat(dfs)
combined_df_dask = combined_df_dask.persist()

In [None]:
# list of delay columns
delay_cols = ["DEP_DELAY", "ARR_DELAY", "CARRIER_DELAY", "WEATHER_DELAY", 
              "NAS_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY"]

combined_df_dask[delay_cols] = combined_df_dask[delay_cols].fillna(0) #replace missing values with 0
combined_df_dask["Delay"] = combined_df_dask[delay_cols].sum(axis=1) #sums all delay columns

In [None]:
#cleaning up FL_DATA column

combined_df_dask["FL_DATE"] = dd.to_datetime(combined_df_dask["FL_DATE"], format="%m/%d/%Y %I:%M:%S %p", errors="coerce")
combined_df_dask["FL_DATE"] = combined_df_dask["FL_DATE"].dt.date #removes the timestamp
combined_df_dask["FL_DATE"] = dd.to_datetime(combined_df_dask["FL_DATE"])

In [None]:
def assign_time_of_day(hour):
    if 5 <= hour < 12:
        return "morning"
    elif 12 <= hour < 17:
        return "afternoon"
    elif 17 <= hour < 21:
        return "evening"
    else:
        return "night"
    
#applied to each partition of the dask df
def process_partition(partition):
    partition = partition.copy()
    partition['hour'] = partition['DEP_TIME'].fillna(0).astype(int) // 100
    partition['Time_of_Day'] = partition['hour'].apply(assign_time_of_day)
    return partition

In [None]:
combined_df_dask = combined_df_dask.map_partitions(
    process_partition,
    meta=combined_df_dask.assign(hour=0, Time_of_Day='object')
)

In [None]:
# assigning seasons depending on month
def get_season(month):
    if month in [12, 1, 2]:
        return "Winter"
    elif month in [3, 4, 5]:
        return "Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    elif month in [9,10,11]:
        return "Fall"
    else:
        return "NA" #is this fine?
    

#creates column 'month' and 'Season' which is applied to a partition
def season_process_partition(partition):
    partition = partition.copy()
    partition['month'] = partition['FL_DATE'].dt.month
    partition['Season'] = partition['month'].apply(get_season)
    return partition

In [None]:
combined_df_dask = combined_df_dask.map_partitions(
    season_process_partition,
    meta=combined_df_dask.assign(month=0, Season ='object')
)

In [None]:
#Route_Pair column based on ORIGIN + DEST airports
combined_df_dask["Route_Pair"] = combined_df_dask["ORIGIN"] + " to " + combined_df_dask["DEST"]

#grouping and performing aggregations
grouped_combined_df = combined_df_dask.groupby(["Route_Pair", "Time_of_Day", "OP_CARRIER", "Season"])["Delay"].agg(["mean", "std", "count"]).reset_index()

In [None]:
#"2nd half" of CI formula 
grouped_combined_df["CI_2nd_Half"] = 1.96 * (grouped_combined_df["std"] /np.sqrt(grouped_combined_df["count"]))

#lower and upper bounds of CI 
grouped_combined_df["CI_Lower"] = grouped_combined_df["mean"] - grouped_combined_df["CI_2nd_Half"]
grouped_combined_df["CI_Upper"] = grouped_combined_df["mean"] + grouped_combined_df["CI_2nd_Half"]

In [None]:
grouped_combined_df = grouped_combined_df.repartition(npartitions= 1)

In [None]:
#optional
# saves the the Dask DataFrame as a Parquet file in S3 bucket - no longer needed since can compute in cell below?

# grouped_combined_df.to_parquet(
#     "s3://historicalflightdata/inference_outputs/",
#     engine="pyarrow",
#     write_index=False
# )

In [None]:
# performs .compute() then saves to Parquet file in S3 bucket

grouped_combined_df.compute().to_parquet(
    "s3://historicalflightdata/inference_outputs/output.parquet",
    engine="pyarrow",
    index=False
)