In [1]:
# If you restarted the runtime, mount Drive again
from google.colab import drive
drive.mount('/content/drive')

# Reuse existing Spark session created in Notebook 01



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!rm -rf /content/spark-3.3.2-bin-hadoop3
!rm -rf spark-3.3.2-bin-hadoop3.tgz


In [3]:
!apt-get update -qq > /dev/null
!apt-get install -qq openjdk-11-jdk-headless > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -xzf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [2]:
# Reset environment
import os, shutil
from google.colab import drive

drive.mount('/content/drive')

!pip install fastparquet pyarrow


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Collecting fastparquet
  Downloading fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Downloading fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m31.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastparquet
Successfully installed fastparquet-2024.11.0


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

path = "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/cleaned_flights.parquet"

df = spark.read.parquet(path)

df.printSchema()
df.show(10)
df.limit(5).toPandas()   # Only convert small part


root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: double (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: double (nullable

Unnamed: 0,YEAR,MONTH,AIRLINE,DAY,DAY_OF_WEEK,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,carrier_ct,weather_ct,nas_ct,security_ct,late_aircraft_ct,arr_cancelled,arr_diverted,agg_arr_delay,agg_carrier_delay,nas_delay
0,2015,1,OO,1,4,5467,N701BR,ONT,SFO,500,...,14.37,0.95,7.8,0.81,21.06,3.0,0.0,3084.0,1107.0,295.0
1,2015,1,OO,1,4,5467,N701BR,ONT,SFO,500,...,128.5,13.46,242.28,0.82,433.93,50.0,2.0,46448.0,10896.0,7322.0
2,2015,1,OO,1,4,5467,N701BR,ONT,SFO,500,...,42.19,1.0,19.47,0.0,63.34,5.0,0.0,6387.0,2052.0,624.0
3,2015,1,OO,1,4,5467,N701BR,ONT,SFO,500,...,8.15,0.0,7.14,0.0,11.7,4.0,1.0,1016.0,424.0,189.0
4,2015,1,OO,1,4,5467,N701BR,ONT,SFO,500,...,5.63,0.45,3.56,0.0,3.36,1.0,1.0,779.0,387.0,179.0


In [3]:
sample_df = df.sample(withReplacement=False, fraction=0.05)

sample_path = "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/dashboard_sample.parquet"

sample_df.write.mode("overwrite").parquet(sample_path)

print("Sample ready at:", sample_path)
print("Rows in sample:", sample_df.count())


Sample ready at: /content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/dashboard_sample.parquet
Rows in sample: 31113571


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [5]:
import findspark
findspark.init()


In [6]:
from pyspark.sql import SparkSession
import os

# Stop any existing Spark session to ensure clean configuration
if 'spark' in locals() and spark.sparkContext._jsc is not None:
    spark.stop()

spark = (SparkSession.builder
    .appName("Quick_Preview")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "2g")
    .config("spark.ui.showConsoleProgress", "false")
    .config("spark.local.dir", "/content/spark_tmp") # Ensure stable local directory
    .getOrCreate())

# Create the directory if it doesn't exist
os.makedirs("/content/spark_tmp", exist_ok=True)

print("✅ Spark ready")
print(f"Spark local directory set to: {spark.conf.get('spark.local.dir')}")

✅ Spark ready
Spark local directory set to: /content/spark_tmp


In [14]:
base_path = "/content/drive/MyDrive/flight_delay_analysis_project"
raw_path  = f"{base_path}/Data"
proc_path = f"{base_path}/Data_Processed"


In [18]:
# Strategy: keep rows where core fields exist; fill numeric delay columns with 0 only if truly missing.
# (You can relax/tighten this later if needed.)

from pyspark.sql.functions import col

# Drop flights with missing airline/year/month/origin/destination
required = ["YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT"]
flights_clean = flights_df
for c in required:
    flights_clean = flights_clean.filter(col(c).isNotNull())

# Fill delay numeric columns (when null) with 0
flights_clean = flights_clean.fillna({
    "DEPARTURE_DELAY": 0.0,
    "ARRIVAL_DELAY":   0.0
})

print("Rows after basic cleaning:", flights_clean.count())

Rows after basic cleaning: 5819079


In [16]:
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [17]:
flights_df = spark.read.csv(f"{raw_path}/flights.csv", header=True, inferSchema=True)
delay_df   = spark.read.csv(f"{raw_path}/Airline_Delay_Cause.csv", header=True, inferSchema=True)

print(f"flights_df rows: {flights_df.count():,}")
print(f"delay_df rows:   {delay_df.count():,}")

flights_df.printSchema()
delay_df.printSchema()

flights_df rows: 5,819,079
delay_df rows:   375,219
root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer

In [19]:
from pyspark.sql.functions import col, when, count, isnan

def nulls_per_col(df):
    exprs = [count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]
    return df.select(exprs)

print("Nulls in flights_df:")
nulls_per_col(flights_df).show(truncate=False)

print("Nulls in delay_df:")
nulls_per_col(delay_df).show(truncate=False)

Nulls in flights_df:
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+--

In [20]:
from pyspark.sql.functions import upper

# Ensure key columns are consistent case/types
# flights: YEAR, MONTH, AIRLINE, ORIGIN_AIRPORT, DESTINATION_AIRPORT, DEPARTURE_DELAY, ARRIVAL_DELAY, DISTANCE, AIR_TIME
flights_df = (flights_df
              .withColumn("YEAR",  col("YEAR").cast("int"))
              .withColumn("MONTH", col("MONTH").cast("int"))
              .withColumn("AIRLINE", upper(col("AIRLINE")))
              .withColumn("DEPARTURE_DELAY", col("DEPARTURE_DELAY").cast("double"))
              .withColumn("ARRIVAL_DELAY",   col("ARRIVAL_DELAY").cast("double"))
              .withColumn("DISTANCE",        col("DISTANCE").cast("double"))
              .withColumn("AIR_TIME",        col("AIR_TIME").cast("double"))
             )

# delay_df uses (year, month, carrier, airport, ... cause counts/minutes)
delay_df = (delay_df
            .withColumnRenamed("year", "YEAR")
            .withColumnRenamed("month", "MONTH")
            .withColumnRenamed("carrier", "AIRLINE")
            .withColumnRenamed("airport", "AIRPORT")
            .withColumn("YEAR",  col("YEAR").cast("int"))
            .withColumn("MONTH", col("MONTH").cast("int"))
            .withColumn("AIRLINE", upper(col("AIRLINE")))
           )

flights_df.select("YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT","DEPARTURE_DELAY","ARRIVAL_DELAY","DISTANCE").show(5)
delay_df.select("YEAR","MONTH","AIRLINE","AIRPORT").show(5)

+----+-----+-------+--------------+-------------------+---------------+-------------+--------+
|YEAR|MONTH|AIRLINE|ORIGIN_AIRPORT|DESTINATION_AIRPORT|DEPARTURE_DELAY|ARRIVAL_DELAY|DISTANCE|
+----+-----+-------+--------------+-------------------+---------------+-------------+--------+
|2015|    1|     AS|           ANC|                SEA|          -11.0|        -22.0|  1448.0|
|2015|    1|     AA|           LAX|                PBI|           -8.0|         -9.0|  2330.0|
|2015|    1|     US|           SFO|                CLT|           -2.0|          5.0|  2296.0|
|2015|    1|     AA|           LAX|                MIA|           -5.0|         -9.0|  2342.0|
|2015|    1|     AS|           SEA|                ANC|           -1.0|        -21.0|  1448.0|
+----+-----+-------+--------------+-------------------+---------------+-------------+--------+
only showing top 5 rows

+----+-----+-------+-------+
|YEAR|MONTH|AIRLINE|AIRPORT|
+----+-----+-------+-------+
|2025|    6|     9E|    ABE|
|202

In [21]:
# Strategy: keep rows where core fields exist; fill numeric delay columns with 0 only if truly missing.
# (You can relax/tighten this later if needed.)

from pyspark.sql.functions import col

# Drop flights with missing airline/year/month/origin/destination
required = ["YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT"]
flights_clean = flights_df
for c in required:
    flights_clean = flights_clean.filter(col(c).isNotNull())

# Fill delay numeric columns (when null) with 0
flights_clean = flights_clean.fillna({
    "DEPARTURE_DELAY": 0.0,
    "ARRIVAL_DELAY":   0.0
})

print("Rows after basic cleaning:", flights_clean.count())

Rows after basic cleaning: 5819079


In [22]:
from pyspark.sql.functions import when, lit, floor

# TOTAL_DELAY and ON_TIME_FLAG (1 if ARRIVAL_DELAY<=0 else 0)
flights_enriched = (flights_clean
    .withColumn("TOTAL_DELAY", col("DEPARTURE_DELAY") + col("ARRIVAL_DELAY"))
    .withColumn("ON_TIME_FLAG", when(col("ARRIVAL_DELAY") <= 0, lit(1)).otherwise(lit(0)))
)

# Optional: Departure hour bucket (if SCHEDULED_DEPARTURE exists as minutes like 0..2359)
if "SCHEDULED_DEPARTURE" in flights_enriched.columns:
    flights_enriched = flights_enriched.withColumn(
        "DEP_HOUR", floor(col("SCHEDULED_DEPARTURE")/100).cast("int")
    )

flights_enriched.select("YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","ARRIVAL_DELAY","TOTAL_DELAY","ON_TIME_FLAG").show(10)

+----+-----+-------+--------------+-------------+-----------+------------+
|YEAR|MONTH|AIRLINE|ORIGIN_AIRPORT|ARRIVAL_DELAY|TOTAL_DELAY|ON_TIME_FLAG|
+----+-----+-------+--------------+-------------+-----------+------------+
|2015|    1|     AS|           ANC|        -22.0|      -33.0|           1|
|2015|    1|     AA|           LAX|         -9.0|      -17.0|           1|
|2015|    1|     US|           SFO|          5.0|        3.0|           0|
|2015|    1|     AA|           LAX|         -9.0|      -14.0|           1|
|2015|    1|     AS|           SEA|        -21.0|      -22.0|           1|
|2015|    1|     DL|           SFO|          8.0|        3.0|           0|
|2015|    1|     NK|           LAS|        -17.0|      -23.0|           1|
|2015|    1|     US|           LAX|        -10.0|        4.0|           1|
|2015|    1|     AA|           SFO|        -13.0|      -24.0|           1|
|2015|    1|     DL|           LAS|        -15.0|      -12.0|           1|
+----+-----+-------+-----

In [23]:
# We will join on YEAR, MONTH, AIRLINE and also try to map the origin airport -> AIRPORT in the aggregated table.
# Not all (YEAR,MONTH,AIRLINE,ORIGIN_AIRPORT) will exist in delay_df; left join keeps all flights.

join_keys_carrier = ["YEAR","MONTH","AIRLINE"]

# First: carrier-month level (many rows in delay_df also include AIRPORT; we'll keep both)
delay_carrier = delay_df.drop("AIRPORT") if "AIRPORT" in delay_df.columns else delay_df

joined_df = flights_enriched.join(delay_carrier, on=join_keys_carrier, how="left")

# Second (optional): if you want airport-specific aggregates too, do an additional left join on ORIGIN_AIRPORT↔AIRPORT
if "AIRPORT" in delay_df.columns:
    delay_airport = delay_df.select("YEAR","MONTH","AIRLINE","AIRPORT",
                                    *[c for c in delay_df.columns if c not in ["YEAR","MONTH","AIRLINE","AIRPORT"]])
    joined_df = joined_df.join(
        delay_airport.withColumnRenamed("AIRPORT","ORIGIN_AIRPORT"),
        on=["YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT"],
        how="left"
    )

joined_df.select("YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","ARRIVAL_DELAY").show(25)


+----+-----+-------+--------------+-------------+
|YEAR|MONTH|AIRLINE|ORIGIN_AIRPORT|ARRIVAL_DELAY|
+----+-----+-------+--------------+-------------+
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|
|2015|    1|     AA|           LAX|         -9.0|


In [24]:
!df -h


Filesystem      Size  Used Avail Use% Mounted on
overlay         108G   41G   68G  38% /
tmpfs            64M     0   64M   0% /dev
shm             5.8G     0  5.8G   0% /dev/shm
/dev/root       2.0G  1.2G  750M  62% /usr/sbin/docker-init
tmpfs           6.4G  1.3M  6.4G   1% /var/colab
/dev/sda1       114G  103G   12G  90% /kaggle/input
tmpfs           6.4G     0  6.4G   0% /proc/acpi
tmpfs           6.4G     0  6.4G   0% /proc/scsi
tmpfs           6.4G     0  6.4G   0% /sys/firmware
drive            15G  6.3G  8.8G  42% /content/drive


In [25]:
!rm -rf /tmp/*
!df -h

Filesystem      Size  Used Avail Use% Mounted on
overlay         108G   41G   68G  38% /
tmpfs            64M     0   64M   0% /dev
shm             5.8G     0  5.8G   0% /dev/shm
/dev/root       2.0G  1.2G  750M  62% /usr/sbin/docker-init
tmpfs           6.4G  1.3M  6.4G   1% /var/colab
/dev/sda1       114G  103G   12G  90% /kaggle/input
tmpfs           6.4G     0  6.4G   0% /proc/acpi
tmpfs           6.4G     0  6.4G   0% /proc/scsi
tmpfs           6.4G     0  6.4G   0% /sys/firmware
drive            15G  6.3G  8.8G  42% /content/drive


In [26]:
# ---- SAFE, FAST VERSION ----
from pyspark.sql.functions import col

# Drop duplicates based on key columns only (faster & memory safe)
joined_df = joined_df.dropDuplicates(["YEAR","MONTH","AIRLINE",
                                      "FLIGHT_NUMBER","ORIGIN_AIRPORT","DESTINATION_AIRPORT"])

print("Deduplication applied on key columns (no full count executed).")

# Apply basic sanity filters without triggering large scans
joined_df = joined_df.filter((col("DISTANCE").isNull()) | (col("DISTANCE") >= 0))
joined_df = joined_df.filter((col("AIR_TIME").isNull()) | (col("AIR_TIME") >= 0))

print("Sanity filters applied successfully.")

Deduplication applied on key columns (no full count executed).
Sanity filters applied successfully.


In [None]:
import pandas as pd

# Load your cleaned dataset
df = pd.read_parquet("/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/cleaned_flights.parquet")

# Define major hub airport codes
hub_codes = [
    "ATL", "ORD", "DFW", "DEN", "LAX", "CLT", "MCO", "LAS", "PHX", "SEA",
    "SFO", "EWR", "MIA", "IAH", "JFK", "BOS", "MSP", "DTW", "PHL", "LGA"
]

# Create new hub indicator columns
df["ORIGIN_IS_HUB"] = df["ORIGIN_AIRPORT"].isin(hub_codes).astype(int)
df["DEST_IS_HUB"] = df["DESTINATION_AIRPORT"].isin(hub_codes).astype(int)

# Save the final dataset (for your Streamlit Dashboard)
output = "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/final_flights_with_hubs.parquet"
df.to_parquet(output, index=False)

print("✔ Final dataset created successfully at:")
print(output)
print("Rows:", len(df))


In [1]:
sample_df = cleaned_df.sample(withReplacement=False, fraction=0.05)
sample_df.write.mode("overwrite").parquet("/content/drive/MyDrive/.../sample_for_dashboard.parquet")


NameError: name 'cleaned_df' is not defined

In [51]:
from pyspark.sql.functions import when, lit, col

# List of major hub airport codes
hub_codes = ["ATL","ORD","DFW","DEN","LAX","CLT","MCO","LAS","PHX","SEA",
             "SFO","EWR","MIA","IAH","JFK","BOS","MSP","DTW","PHL","LGA"]

# Create hub flags using built-in Spark SQL functions (no UDF)
joined_df = (joined_df
    .withColumn("ORIGIN_IS_HUB", when(col("ORIGIN_AIRPORT").isin(hub_codes), lit(1)).otherwise(lit(0)))
    .withColumn("DEST_IS_HUB",   when(col("DESTINATION_AIRPORT").isin(hub_codes), lit(1)).otherwise(lit(0)))
)

joined_df.select("ORIGIN_AIRPORT","ORIGIN_IS_HUB","DESTINATION_AIRPORT","DEST_IS_HUB").show(10, truncate=False)

# Save the processed DataFrame to a stable location to avoid temporary file issues
output_path = f"{proc_path}/joined_flights_with_hubs.parquet"
print(f"Saving processed data to: {output_path}")
joined_df.write.mode("overwrite").parquet(output_path)
print("✅ Processed DataFrame saved successfully.")


+--------------+-------------+-------------------+-----------+
|ORIGIN_AIRPORT|ORIGIN_IS_HUB|DESTINATION_AIRPORT|DEST_IS_HUB|
+--------------+-------------+-------------------+-----------+
|ANC           |0            |SEA                |1          |
|LAX           |1            |PBI                |0          |
|SEA           |1            |ANC                |0          |
|SFO           |1            |MSP                |1          |
|LAS           |1            |MSP                |1          |
|LAS           |1            |ATL                |1          |
|DEN           |1            |ATL                |1          |
|LAS           |1            |MIA                |1          |
|ANC           |0            |SEA                |1          |
|SFO           |1            |IAH                |1          |
+--------------+-------------+-------------------+-----------+
only showing top 10 rows

Saving processed data to: /content/drive/MyDrive/flight_delay_analysis_project/Data_Processe

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [52]:
!ps -aux | grep java


root        2702  125 17.0 5239512 2263896 ?     Sl   18:15  96:21 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -cp /content/spark-3.5.1-bin-hadoop3/conf/:/content/spark-3.5.1-bin-hadoop3/jars/* -Xmx2g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirec

In [53]:
import shutil

path = "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/joined_flights_with_hubs.parquet"

shutil.rmtree(path, ignore_errors=True)
print("Deleted incomplete parquet folder successfully!")


Deleted incomplete parquet folder successfully!


In [57]:
delay_df = delay_df.withColumnRenamed("arr_delay", "agg_arr_delay") \
                   .withColumnRenamed("carrier_delay", "agg_carrier_delay") \
                   .withColumnRenamed("weather_delay", "agg_weather_delay") \
                   .withColumnRenamed("nas_delay", "agg_nas_delay") \
                   .withColumnRenamed("security_delay", "agg_security_delay") \
                   .withColumnRenamed("late_aircraft_delay", "agg_late_aircraft_delay") \
                   .withColumnRenamed("carrier_ct", "agg_carrier_ct") \
                   .withColumnRenamed("weather_ct", "agg_weather_ct") \
                   .withColumnRenamed("nas_ct", "agg_nas_ct") \
                   .withColumnRenamed("security_ct", "agg_security_ct") \
                   .withColumnRenamed("late_aircraft_ct", "agg_late_aircraft_ct")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o2337.withColumnRenamed

In [49]:
joined_df = flights_enriched.join(delay_df, ["YEAR","MONTH","AIRLINE"], how="left")

In [None]:
joined_df.select("YEAR","MONTH","AIRLINE","ORIGIN_AIRPORT","ARRIVAL_DELAY",
                 "agg_arr_delay","agg_carrier_delay").show(10)


In [None]:
out_path = f"{proc_path}/cleaned_flights.parquet"


In [None]:
print([col for col in joined_df.columns if joined_df.columns.count(col) > 1])


In [None]:
for c in joined_df.columns:
    print(repr(c))


In [None]:
!ls -lh /content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/


In [None]:
!ls -lh "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/cleaned_flights.parquet"


In [None]:
# Remove any previous local copy to avoid mixing files
!rm -rf /content/cleaned_flights.parquet
# Copy from Drive -> local disk
!cp -r "/content/drive/MyDrive/flight_delay_analysis_project/Data_Processed/cleaned_flights.parquet" /content/
# Sanity check
!ls -lh /content/cleaned_flights.parquet


In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("Flight_Delay_Step2_Cleaning")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")  # keep small on Colab
         .getOrCreate())

# Optional: slightly fewer shuffles if you still see slowness
# spark.conf.set("spark.sql.shuffle.partitions", "4")


In [None]:
!ls -lh /content/cleaned_flights.parquet


In [None]:
df_test = spark.read.option("mergeSchema", "false").parquet("/content/cleaned_flights.parquet")
df_test.printSchema()


In [None]:
# ⚡ read just 1 parquet part (fast preview)
sample_path = "/content/cleaned_flights.parquet/part-00000-b47ae07b-6e01-4502-a298-c34c0eeae787-c000.snappy.parquet"

df_sample = spark.read.parquet(sample_path)
print("✅ Loaded single parquet part")
df_sample.show(5)


In [None]:
print("Total records:", df_sample.count())


In [None]:
clean_df = spark.read.option("mergeSchema","false").parquet("/content/cleaned_flights.parquet")
print("Full cleaned records:", clean_df.count())


In [None]:
from pyspark.sql.functions import avg, round as sround

df_sample.groupBy("AIRLINE") \
    .agg(sround(avg("ARRIVAL_DELAY"),2).alias("AVG_ARR_DELAY")) \
    .orderBy("AVG_ARR_DELAY", ascending=False) \
    .show(5)


In [None]:
df_sample.write.mode("overwrite").parquet("/content/drive/MyDrive/flight_delay_analysis_project/Outputs/preview_sample.parquet")
print("✅ Sample saved to Outputs folder")
