In [None]:
# Install PySpark in Colab
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count, avg, to_timestamp
import pandas as pd
import time
import requests

# Download datasets from URLs
airport_url = "https://raw.githubusercontent.com/databricks/LearningSparkV2/master/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
delays_url = "https://raw.githubusercontent.com/databricks/LearningSparkV2/master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

# Fetch and save files locally
with open("airport-codes-na.txt", "wb") as f:
    f.write(requests.get(airport_url).content)
with open("departuredelays.csv", "wb") as f:
    f.write(requests.get(delays_url).content)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Simulated Real-Time Flight Delay Processing") \
    .getOrCreate()

# Load airport codes (static data)
airport_df = spark.read.option("sep", "\t").option("header", "true").csv("airport-codes-na.txt")

# Load departure delays into Pandas for simulation
delays_pd = pd.read_csv("departuredelays.csv")
# Convert 'date' to a timestamp (assuming year 2019 for simplicity)
delays_pd['timestamp'] = pd.to_datetime(delays_pd['date'].astype(str).str.zfill(8), format="%m%d%H%M", errors='coerce')

# Simulate streaming by processing delays in chunks
chunk_size = 50  # Process 50 flights at a time
for start in range(0, len(delays_pd), chunk_size):
    end = min(start + chunk_size, len(delays_pd))
    chunk = delays_pd.iloc[start:end]

    # Convert chunk to Spark DataFrame
    spark_chunk = spark.createDataFrame(chunk)

    # Join with airport data to enrich with origin city
    enriched_chunk = spark_chunk.join(
        airport_df.select("IATA", "City"),
        spark_chunk["origin"] == airport_df["IATA"],
        "left"
    ).drop("IATA")

    # Process the "stream": Compute metrics over a sliding window
    metrics = enriched_chunk \
        .withColumn("timestamp", to_timestamp(col("timestamp"))) \
        .groupBy(window("timestamp", "1 hour"), "City") \
        .agg(
            count("delay").alias("flight_count"),
            avg("delay").alias("avg_delay")
        )

    # Show metrics for this chunk (simulates real-time output)
    print(f"Metrics for chunk {start//chunk_size + 1} (rows {start}-{end-1}):")
    metrics.show(truncate=False)

    # Simulate delay between chunks
    time.sleep(2)

# Stop the Spark session
spark.stop()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
+------------------------------------------+----------+------------+---------+
|window                                    |City      |flight_count|avg_delay|
+------------------------------------------+----------+------------+---------+
|{1900-01-15 17:00:00, 1900-01-15 18:00:00}|Alexandria|1           |27.0     |
|{1900-01-14 05:00:00, 1900-01-14 06:00:00}|Alexandria|1           |-11.0    |
|{1900-01-16 10:00:00, 1900-01-16 11:00:00}|Alexandria|1           |-8.0     |
|{1900-01-14 14:00:00, 1900-01-14 15:00:00}|Alexandria|1           |-7.0     |
|{1900-01-16 11:00:00, 1900-01-16 12:00:00}|Alexandria|2           |-6.5     |
|{1900-01-14 18:00:00, 1900-01-14 19:00:00}|Alexandria|1           |-10.0    |
|{1900-01-15 06:00:00, 1900-01-15 07:00:00}|Alexandria|1           |-4.0     |
|{1900-01-14 10:00:00, 1900-01-14 11:00:00}|Alexandria|1           |-14.0    |
|{1900-01-15 14:00:00, 1900-01-15 15:00:00}|Alexandria|1          