In [9]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.window import Window
from datetime import timedelta

# Start Spark session
spark = SparkSession.builder.getOrCreate()

In [10]:
# 1. Read the flight_tracker CSV
df = spark.read.option("header", True).option("inferSchema", True).csv("/content/FPT.csv").cache()

In [11]:
# 2. Calculate date range
date_range_row = df.select(
    F.date_trunc('day', F.col('actl_dep_lcl_tms')).alias('dep_day'),
    F.date_trunc('day', F.col('actl_arr_lcl_tms')).alias('arr_day')
).agg(
    F.min('dep_day').alias('start_date'),
    (F.max('arr_day') + F.expr('INTERVAL 1 DAY')).alias('end_date')
).collect()[0]

start_date = date_range_row['start_date']
end_date = date_range_row['end_date']

In [12]:
# 3. Generate 15-minute intervals
total_minutes = int((end_date - start_date).total_seconds() // 60)
num_intervals = total_minutes // 15 + 1
intervals = [(start_date + timedelta(minutes=15 * i),) for i in range(num_intervals)]
intervals_df = spark.createDataFrame(intervals, ['interval_ts']).cache()

In [13]:
# 4. Get all airport codes
airport_codes_df = (
    df.select(F.col('orig').alias('port_code')).distinct()
    .union(df.select(F.col('dest').alias('port_code')).distinct())
    .distinct()
    .cache()
)

In [14]:
# 5. Cross join airport codes with intervals
port_intervals_df = airport_codes_df.crossJoin(intervals_df).cache()

In [15]:
# 6. Alias DataFrames for clarity in joins
pi = port_intervals_df.alias('pi')
f = df.alias('f')

In [16]:
# 7. Join and aggregate for out_cnt and in_cnt
window_start = F.col('pi.interval_ts') - F.expr('INTERVAL 2 HOURS')
window_end = F.col('pi.interval_ts')

In [17]:
# Join flight data for both departures and arrivals
joined = pi.join(
    f,
    (
        ((F.col('pi.port_code') == F.col('f.orig')) &
         (F.col('f.actl_dep_lcl_tms') >= window_start) &
         (F.col('f.actl_dep_lcl_tms') <= window_end))
        |
        ((F.col('pi.port_code') == F.col('f.dest')) &
         (F.col('f.actl_arr_lcl_tms') >= window_start) &
         (F.col('f.actl_arr_lcl_tms') <= window_end))
    ),
    how='left'
)

In [18]:
# Aggregation using CASE WHEN logic
result = joined.groupBy('pi.port_code', 'pi.interval_ts').agg(
    F.sum(
        F.when(
            (F.col('pi.port_code') == F.col('f.orig')) &
            (F.col('f.actl_dep_lcl_tms') >= window_start) &
            (F.col('f.actl_dep_lcl_tms') <= window_end),
            1
        ).otherwise(0)
    ).alias('out_cnt'),
    F.sum(
        F.when(
            (F.col('pi.port_code') == F.col('f.dest')) &
            (F.col('f.actl_arr_lcl_tms') >= window_start) &
            (F.col('f.actl_arr_lcl_tms') <= window_end),
            1
        ).otherwise(0)
    ).alias('in_cnt')
).orderBy('pi.port_code', 'pi.interval_ts')

In [21]:
result.cache()
result.coalesce(1).write.option("header", True).mode("overwrite").csv("/content")