In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("patrickzel/flight-delay-and-cancellation-dataset-2019-2023")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/patrickzel/flight-delay-and-cancellation-dataset-2019-2023?dataset_version_number=7...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 140M/140M [00:03<00:00, 46.2MB/s] 

Extracting files...





Path to dataset files: /Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7


In [3]:

import os
import glob 

In [7]:
# 1. Imports and Environment Setup
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os
import glob
from pyspark.sql.functions import col

# Initialize Spark Session (Big Data Safe)
# This MUST run before any command that uses the 'spark' object.
spark = (SparkSession.builder
    .appName("FlightDelayGraphAnalysis")
    .getOrCreate())

print(f"Spark Version: {spark.version}")
print("Spark Session Initialized.")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/01 14:32:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.0.1
Spark Session Initialized.


In [16]:
pyspark_data_path = "/Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv"

try:
    raw_flights_df = (spark.read.csv(
        pyspark_data_path,
        header=True,
        schema=final_flight_schema, 
        ignoreLeadingWhiteSpace=True)
    )

    print(f"\n‚úÖ Data Successfully Ingested. Total Records: {raw_flights_df.count():,}")

except Exception as e:
    print(f"\n‚ùå Ingestion Error: {e}")




‚úÖ Data Successfully Ingested. Total Records: 3,000,000


                                                                                

In [21]:
# 1. Inspect the downloaded directory to find the actual data files
# The 'path' variable holds the local directory where the data was downloaded.
print(f"Inspecting directory: {path}")

# Assuming the data files are CSVs (e.g., 'flights.csv' or multiple monthly files)
# Adjust the pattern if your files are named differently (e.g., '*.parquet' or 'flight_data_*.csv')
file_pattern = os.path.join(path, "*.csv")
data_files = glob.glob(file_pattern)

if not data_files:
    print("\nüö® ERROR: No CSV files found in the downloaded directory.")
    print("Please check the directory structure or the file pattern.")
    # Exit or raise an error if critical files are missing
else:
    print(f"\nFound {len(data_files)} data files. First file: {data_files[0]}")
    
    # 2. Define the PySpark Ingestion Path
    # --- CRITICAL FIX: OVERRIDE WILDCARD WITH EXACT PATH ---
    # Since we know the file is 'flights_sample_3m.csv', we must use the exact path found by glob.
    pyspark_data_path = data_files[0] # Use the first (and only) file found by glob
    
    # Original logic (now overridden but left for context):
    # pyspark_data_path = file_pattern
    
    print(f"PySpark ingestion path (FIXED): {pyspark_data_path}")
    
    # --- 3. Data Ingestion using Pre-defined Schema (Big Data Safe) ---
    # We rely on the 'spark' session and 'final_flight_schema' (assuming it's defined correctly)
    
    try:
        # NOTE: Ensure 'final_flight_schema' is used instead of 'flight_schema' if you updated the name.
        raw_flights_df = (spark.read.csv(
            pyspark_data_path, # Now using the fixed path
            header=True,
            schema=final_flight_schema, # Use the correct, fixed schema
            ignoreLeadingWhiteSpace=True)
        )

        # 4. Initial Inspection
        print(f"\n‚úÖ Total Records Loaded (Estimated): {raw_flights_df.count():,}")
        raw_flights_df.printSchema()
        # raw_flights_df.limit(5).toPandas() # Uncomment if you want to see a preview
        
    except Exception as e:
        print(f"\n‚ùå Error during PySpark Ingestion. Check column names/types against the schema.")
        print(f"Error: {e}")

Inspecting directory: /Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7

Found 1 data files. First file: /Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv
PySpark ingestion path (FIXED): /Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv





‚úÖ Total Records Loaded (Estimated): 3,000,000
root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = true)
 |-- DELAY_DUE_WEATHER: double (nullable = true)
 |-- DELAY_DUE_NAS: double (nullable = true)
 |-- DELAY_DUE_SECURITY: double (nullable = true)
 |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- W

                                                                                

In [18]:
# 1. Total Row Count (Volume Check)
total_rows = raw_flights_df.count()
print(f"Total Rows in raw_flights_df: {total_rows:,}")

# 2. Schema Check (Data Types)
# Verifies the types were correctly applied by the final_flight_schema.
print("\n--- DataFrame Schema (Data Types) ---")
raw_flights_df.printSchema()

# 3. Head Object (First 5 Rows)
# Displays the first 5 rows for visual verification of data integrity.
print("\n--- Head Object (First 5 Rows) ---")
# Only select key columns for a cleaner display:
key_columns = ["FL_DATE", "ORIGIN", "DEST", "DEP_DELAY", "ARR_DELAY", "DISTANCE", "AIRLINE_CODE"]

raw_flights_df.select(*key_columns).limit(5).toPandas()

25/12/01 14:46:53 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: FL_DATE, AIRLINE, DOT_CODE, ORIGIN, DEST, DEST_CITY, TAXI_OUT
 Schema: FL_DATE, AIRLINE_CODE, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, DISTANCE
Expected: AIRLINE_CODE but found: AIRLINE
CSV file: file:///Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv


Total Rows in raw_flights_df: 3,000,000

--- DataFrame Schema (Data Types) ---
root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = true)
 |-- DELAY_DUE_WEATHER: double (nullable = true)
 |-- DELAY_DUE_NAS: double (nullable = true)
 |-- DELAY_DUE_SECURITY: double (nullable = true)
 |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- TAXI_OUT: d

Unnamed: 0,FL_DATE,ORIGIN,DEST,DEP_DELAY,ARR_DELAY,DISTANCE,AIRLINE_CODE
0,2019-01-09,19977,FLL,,,19.0,United Air Lines Inc.
1,2022-11-19,19790,MSP,,,9.0,Delta Air Lines Inc.
2,2022-07-22,19977,DEN,,,20.0,United Air Lines Inc.
3,2023-03-06,19790,MSP,,,27.0,Delta Air Lines Inc.
4,2020-02-23,20416,MCO,,,15.0,Spirit Air Lines


In [19]:
## üìù Define Final Schema (Big Data Safe Practice)

# Define the definitive schema based on the provided dictionary ("Updated Header")
flight_schema = StructType([
    # Key Identifiers
    StructField("FL_DATE", StringType(), True),         
    StructField("AIRLINE", StringType(), True),    # Used to be OP_UNIQUE_CARRIER
    StructField("DOT_CODE", IntegerType(), True),
    StructField("FL_NUMBER", IntegerType(), True),
    
    # Graph Vertices
    StructField("ORIGIN", StringType(), True),           # Origin Airport Code (CRUCIAL)
    StructField("ORIGIN_CITY", StringType(), True),      # Useful property for Vertices
    StructField("DEST", StringType(), True),             # Destination Airport Code (CRUCIAL)
    StructField("DEST_CITY", StringType(), True),        # Useful property for Vertices
    
    # Time and Delay Metrics (Using DoubleType for flexibility with nulls and floats)
    StructField("DEP_DELAY", DoubleType(), True),
    StructField("ARR_DELAY", DoubleType(), True),      # Key analysis column
    StructField("CANCELLED", DoubleType(), True),
    StructField("DIVERTED", DoubleType(), True),
    StructField("AIR_TIME", DoubleType(), True),       # Key Edge Property
    StructField("DISTANCE", DoubleType(), True),       # Key Edge Property
    
    # Detailed Delay Attributions
    StructField("DELAY_DUE_CARRIER", DoubleType(), True),
    StructField("DELAY_DUE_WEATHER", DoubleType(), True),
    StructField("DELAY_DUE_NAS", DoubleType(), True),
    StructField("DELAY_DUE_SECURITY", DoubleType(), True),
    StructField("DELAY_DUE_LATE_AIRCRAFT", DoubleType(), True),
    
    # Include other columns needed for ETL/cleaning
    StructField("CRS_DEP_TIME", IntegerType(), True),
    StructField("DEP_TIME", DoubleType(), True),
    StructField("TAXI_OUT", DoubleType(), True),
    StructField("WHEELS_OFF", DoubleType(), True),
    StructField("WHEELS_ON", DoubleType(), True),
    StructField("TAXI_IN", DoubleType(), True),
    StructField("CRS_ARR_TIME", IntegerType(), True),
    StructField("ARR_TIME", DoubleType(), True),
    StructField("CANCELLATION_CODE", StringType(), True),
    StructField("CRS_ELAPSED_TIME", DoubleType(), True),
    StructField("ELAPSED_TIME", DoubleType(), True)
])

print("Final Flight Schema Defined based on the data dictionary.")

Final Flight Schema Defined based on the data dictionary.


In [23]:
## ‚öôÔ∏è Cell 11: Cleaning, Feature Engineering, and Graph Setup

from pyspark.sql.functions import when, col, lit, count, desc
from graphframes import GraphFrame

# --- 1. Data Cleaning and Feature Engineering (Preparing Edges) ---

# Filter out non-essential nulls and ill-defined data points.
# We must have ORIGIN/DEST, ARR_DELAY, and DISTANCE for graph analysis.
cleaned_flights_df = (raw_flights_df
    .filter(col("ORIGIN").isNotNull())
    .filter(col("DEST").isNotNull())
    .filter(col("ARR_DELAY").isNotNull()) # We must know the delay to analyze the edge
    .filter(col("DISTANCE") > 0)          # Distance must be positive
    .cache() # Cache this key intermediate result! (Big Data Safe Practice for reuse)
)
print(f"Records remaining after critical cleaning: {cleaned_flights_df.count():,}")

# Create the final Edge DataFrame with necessary GraphFrames column names and features
edges_df = (cleaned_flights_df
    .select(
        col("ORIGIN").alias("src"),    # Source airport (MANDATORY)
        col("DEST").alias("dst"),      # Destination airport (MANDATORY)
        col("DISTANCE").alias("distance"),
        col("ARR_DELAY").alias("arr_delay"),
        
        # FEATURE ENGINEERING: Create the is_delayed flag (Edge Property)
        # This is a critical edge property for motif finding and analysis.
        when(col("ARR_DELAY") > 15, 1).otherwise(0).alias("is_delayed"),
        
        col("AIRLINE_CODE").alias("carrier"), 
        col("FL_DATE").alias("date")
    )
    .cache()
)

print("\nEdges DataFrame (Preview):")
edges_df.limit(3).show()


# --- 2. Create the Vertex DataFrame (Nodes) ---
# Vertices must have an 'id' column. We include city information as a property.
all_origins = cleaned_flights_df.select(
    col("ORIGIN").alias("id"), 
    col("ORIGIN_CITY").alias("city")
).distinct()

all_destinations = cleaned_flights_df.select(
    col("DEST").alias("id"), 
    col("DEST_CITY").alias("city")
).distinct()

# Union all origins and destinations to get all unique airports (Big Data Safe Union)
# Note: UnionByName handles cases where columns might be slightly misaligned (though here they are clean).
vertices_df = all_origins.unionByName(all_destinations).distinct().cache()

print(f"\nTotal Unique Airports (Vertices): {vertices_df.count():,}")
print("Vertices DataFrame (Preview):")
vertices_df.limit(5).show(truncate=False)


# --- 3. Instantiate the GraphFrame ---
print("\n--- Instantiating GraphFrame ---")
g = GraphFrame(vertices_df, edges_df)

print(f"GraphFrame created with {g.vertices.count():,} Vertices and {g.edges.count():,} Edges.")
print("Graph Ready for Analysis!")

25/12/01 14:51:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/01 14:51:34 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 32, schema size: 30
CSV file: file:///Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv
                                                                                

Records remaining after critical cleaning: 0

Edges DataFrame (Preview):
+---+---+--------+---------+----------+-------+----+
|src|dst|distance|arr_delay|is_delayed|carrier|date|
+---+---+--------+---------+----------+-------+----+
+---+---+--------+---------+----------+-------+----+


Total Unique Airports (Vertices): 0
Vertices DataFrame (Preview):
+---+----+
|id |city|
+---+----+
+---+----+


--- Instantiating GraphFrame ---




Py4JJavaError: An error occurred while calling o212.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
