In [24]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("patrickzel/flight-delay-and-cancellation-dataset-2019-2023")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'flight-delay-and-cancellation-dataset-2019-2023' dataset.
Path to dataset files: /kaggle/input/flight-delay-and-cancellation-dataset-2019-2023


In [2]:
# Imports
import os
import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os
import glob
from pyspark.sql.functions import col

In [3]:
# Initialize Spark Session

spark = (SparkSession.builder
    .appName("FlightDelayGraphAnalysis")
    .getOrCreate())

print(f"Spark Version: {spark.version}")
print("Spark Session Initialized.")

Spark Version: 3.5.1
Spark Session Initialized.


In [25]:
#download_dir = "/root/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7"
download_dir = "/kaggle/input/flight-delay-and-cancellation-dataset-2019-2023"
file_name = "flights_sample_3m.csv"
pyspark_data_path = f"{download_dir}/{file_name}"

try:
    raw_flights_df = (spark.read.csv(
        pyspark_data_path,
        header=True,
        schema=flight_schema,
        ignoreLeadingWhiteSpace=True)
    )

    print(f"\n✅ Data Successfully Ingested. Total Records: {raw_flights_df.count():,}")

except Exception as e:
    print(f"\n Ingestion Error: {e}")


✅ Data Successfully Ingested. Total Records: 3,000,000


In [27]:
# 1. Inspect the downloaded directory

print(f"Inspecting directory: {path}")

data_files = glob.glob(path)

if not data_files:
    print("\n ERROR: No CSV files found in the downloaded directory.")

else:
    print(f"\nFound {len(data_files)} data files. First file: {data_files[0]}")

    # 2. Define the PySpark Ingestion Path
    pyspark_data_path = data_files[0]

    # 3. Data Ingestion using Pre-defined Schema

    try:
        raw_flights_df = (spark.read.csv(
            pyspark_data_path,
            header=True,
            schema=flight_schema,
            ignoreLeadingWhiteSpace=True)
        )

        # 4. Initial Inspection
        print(f"\n Total Records Loaded : {raw_flights_df.count():,}")
        raw_flights_df.printSchema()

    except Exception as e:
        print(f"\n Error during PySpark Ingestion. Check column names/types against the schema.")
        print(f"Error: {e}")

Inspecting directory: /kaggle/input/flight-delay-and-cancellation-dataset-2019-2023

Found 1 data files. First file: /kaggle/input/flight-delay-and-cancellation-dataset-2019-2023

 Total Records Loaded : 3,000,191
root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = true)
 |-- DELAY_DUE_WEATHER: double (nullable = true)
 |-- DELAY_DUE_NAS: double (nullable = true)
 |-- DELAY_DUE_SECURITY: double (nullable = true)
 |-- DELAY_DUE_LATE_AIRCRA

In [14]:
# 1. Total Row Count (Volume Check)
total_rows = raw_flights_df.count()
print(f"Total Rows in raw_flights_df: {total_rows:,}")

# 2. Schema Check (Data Types)

print("\n--- DataFrame Schema (Data Types) ---")
raw_flights_df.printSchema()

# 3. Head Object (First 5 Rows)

print("\n--- Head Object (First 5 Rows) ---")
# Only select key columns for a cleaner display:
key_columns = ["FL_DATE", "ORIGIN", "DEST", "DEP_DELAY", "ARR_DELAY", "DISTANCE", "AIRLINE"]

raw_flights_df.select(*key_columns).limit(5).toPandas()

Total Rows in raw_flights_df: 3,000,191

--- DataFrame Schema (Data Types) ---
root
 |-- FL_DATE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = true)
 |-- DELAY_DUE_WEATHER: double (nullable = true)
 |-- DELAY_DUE_NAS: double (nullable = true)
 |-- DELAY_DUE_SECURITY: double (nullable = true)
 |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- TAXI_OUT: double

Unnamed: 0,FL_DATE,ORIGIN,DEST,DEP_DELAY,ARR_DELAY,DISTANCE,AIRLINE
0,2019-01-09,19977,FLL,,,19.0,United Air Lines Inc.
1,2022-11-19,19790,MSP,,,9.0,Delta Air Lines Inc.
2,2022-07-22,19977,DEN,,,20.0,United Air Lines Inc.
3,2023-03-06,19790,MSP,,,27.0,Delta Air Lines Inc.
4,2020-02-23,20416,MCO,,,15.0,Spirit Air Lines


In [26]:
##  Define Schema  based on the provided dictionary

flight_schema = StructType([
    # Key Identifiers
    StructField("FL_DATE", StringType(), True),
    StructField("AIRLINE", StringType(), True),
    StructField("DOT_CODE", IntegerType(), True),
    StructField("FL_NUMBER", IntegerType(), True),

    # Graph Vertices
    StructField("ORIGIN", StringType(), True),           # Origin Airport Code (CRUCIAL)
    StructField("ORIGIN_CITY", StringType(), True),      # Useful property for Vertices
    StructField("DEST", StringType(), True),             # Destination Airport Code (CRUCIAL)
    StructField("DEST_CITY", StringType(), True),        # Useful property for Vertices

    # Time and Delay Metrics (Using DoubleType for flexibility with nulls and floats)
    StructField("DEP_DELAY", DoubleType(), True),
    StructField("ARR_DELAY", DoubleType(), True),      # Key analysis column
    StructField("CANCELLED", DoubleType(), True),
    StructField("DIVERTED", DoubleType(), True),
    StructField("AIR_TIME", DoubleType(), True),       # Key Edge Property
    StructField("DISTANCE", DoubleType(), True),       # Key Edge Property

    # Detailed Delay Attributions
    StructField("DELAY_DUE_CARRIER", DoubleType(), True),
    StructField("DELAY_DUE_WEATHER", DoubleType(), True),
    StructField("DELAY_DUE_NAS", DoubleType(), True),
    StructField("DELAY_DUE_SECURITY", DoubleType(), True),
    StructField("DELAY_DUE_LATE_AIRCRAFT", DoubleType(), True),

    # Include other columns needed for ETL/cleaning
    StructField("CRS_DEP_TIME", IntegerType(), True),
    StructField("DEP_TIME", DoubleType(), True),
    StructField("TAXI_OUT", DoubleType(), True),
    StructField("WHEELS_OFF", DoubleType(), True),
    StructField("WHEELS_ON", DoubleType(), True),
    StructField("TAXI_IN", DoubleType(), True),
    StructField("CRS_ARR_TIME", IntegerType(), True),
    StructField("ARR_TIME", DoubleType(), True),
    StructField("CANCELLATION_CODE", StringType(), True),
    StructField("CRS_ELAPSED_TIME", DoubleType(), True),
    StructField("ELAPSED_TIME", DoubleType(), True)
])

print("Final Flight Schema Defined based on the data dictionary.")

Final Flight Schema Defined based on the data dictionary.


In [16]:
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl.metadata (934 bytes)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m153.6/154.7 kB[0m [31m6.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


In [17]:

from pyspark.sql.functions import when, col, lit, count, desc
from graphframes import GraphFrame

In [21]:
from pyspark.sql.functions import col, isnan

In [28]:
print("ARR_DELAY NaN:", raw_flights_df.filter(isnan("ARR_DELAY")).count())
print("DISTANCE NaN:", raw_flights_df.filter(isnan("DISTANCE")).count())
print("DISTANCE <= 0:", raw_flights_df.filter(col("DISTANCE") <= 0).count())
print("ORIGIN empty:", raw_flights_df.filter(col("ORIGIN") == "").count())
print("DEST empty:", raw_flights_df.filter(col("DEST") == "").count())


ARR_DELAY NaN: 0
DISTANCE NaN: 0
DISTANCE <= 0: 0
ORIGIN empty: 0
DEST empty: 0


In [31]:
# Create the final Edge DataFrame with necessary GraphFrames column names and features
edges_df = (
    raw_flights_df
        .select(
            col("ORIGIN").alias("src"),    # Source airport (MANDATORY)
            col("DEST").alias("dst"),      # Destination airport (MANDATORY)
            col("DISTANCE").alias("distance"),
            col("ARR_DELAY").alias("arr_delay"),

            # FEATURE ENGINEERING: Create the is_delayed flag
            when(col("ARR_DELAY") > 15, 1).otherwise(0).alias("is_delayed"),

            col("AIRLINE").alias("carrier"),
            col("FL_DATE").alias("date")
        )
        .cache()
)

print("\nEdges DataFrame (Preview):")
edges_df.limit(5).show()


Edges DataFrame (Preview):
+-----+---+--------+---------+----------+--------------------+----------+
|  src|dst|distance|arr_delay|is_delayed|             carrier|      date|
+-----+---+--------+---------+----------+--------------------+----------+
|19977|FLL|    19.0|     NULL|         0|United Air Lines ...|2019-01-09|
|19790|MSP|     9.0|     NULL|         0|Delta Air Lines Inc.|2022-11-19|
|19977|DEN|    20.0|     NULL|         0|United Air Lines ...|2022-07-22|
|19790|MSP|    27.0|     NULL|         0|Delta Air Lines Inc.|2023-03-06|
|20416|MCO|    15.0|     NULL|         0|    Spirit Air Lines|2020-02-23|
+-----+---+--------+---------+----------+--------------------+----------+



In [None]:


# Create the final Edge DataFrame with necessary GraphFrames column names and features
edges_df = (cleaned_flights_df
    .select(
        col("ORIGIN").alias("src"),    # Source airport (MANDATORY)
        col("DEST").alias("dst"),      # Destination airport (MANDATORY)
        col("DISTANCE").alias("distance"),
        col("ARR_DELAY").alias("arr_delay"),

        # FEATURE ENGINEERING: Create the is_delayed flag (Edge Property)
        # This is a critical edge property for motif finding and analysis.
        when(col("ARR_DELAY") > 15, 1).otherwise(0).alias("is_delayed"),

        col("AIRLINE_CODE").alias("carrier"),
        col("FL_DATE").alias("date")
    )
    .cache()
)

print("\nEdges DataFrame (Preview):")
edges_df.limit(3).show()


# --- 2. Create the Vertex DataFrame (Nodes) ---
# Vertices must have an 'id' column. We include city information as a property.
all_origins = cleaned_flights_df.select(
    col("ORIGIN").alias("id"),
    col("ORIGIN_CITY").alias("city")
).distinct()

all_destinations = cleaned_flights_df.select(
    col("DEST").alias("id"),
    col("DEST_CITY").alias("city")
).distinct()

# Union all origins and destinations to get all unique airports (Big Data Safe Union)
# Note: UnionByName handles cases where columns might be slightly misaligned (though here they are clean).
vertices_df = all_origins.unionByName(all_destinations).distinct().cache()

print(f"\nTotal Unique Airports (Vertices): {vertices_df.count():,}")
print("Vertices DataFrame (Preview):")
vertices_df.limit(5).show(truncate=False)


# --- 3. Instantiate the GraphFrame ---
print("\n--- Instantiating GraphFrame ---")
g = GraphFrame(vertices_df, edges_df)

print(f"GraphFrame created with {g.vertices.count():,} Vertices and {g.edges.count():,} Edges.")
print("Graph Ready for Analysis!")

25/12/01 14:51:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/01 14:51:34 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 32, schema size: 30
CSV file: file:///Users/katarinadvornak/.cache/kagglehub/datasets/patrickzel/flight-delay-and-cancellation-dataset-2019-2023/versions/7/flights_sample_3m.csv
                                                                                

Records remaining after critical cleaning: 0

Edges DataFrame (Preview):
+---+---+--------+---------+----------+-------+----+
|src|dst|distance|arr_delay|is_delayed|carrier|date|
+---+---+--------+---------+----------+-------+----+
+---+---+--------+---------+----------+-------+----+


Total Unique Airports (Vertices): 0
Vertices DataFrame (Preview):
+---+----+
|id |city|
+---+----+
+---+----+


--- Instantiating GraphFrame ---




Py4JJavaError: An error occurred while calling o212.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
