Since, the dataset is huge, around 5.8 million records, the default choice is spark.

## Why Spark?

Apache Spark was chosen as the primary data processing framework for this flight delays analysis project for several key reasons:

1. **Large Dataset Handling**
   - The dataset contains approximately 5.8 million flight records
   - Spark's distributed computing capabilities allow efficient processing of large-scale data
   - In-memory processing provides faster data operations compared to traditional disk-based solutions

2. **Scalability**
   - Spark can easily scale from local development to cluster deployment
   - Supports both vertical (more memory/CPU) and horizontal (more nodes) scaling
   - Efficient memory management with features like off-heap memory storage

3. **SQL and DataFrame Operations**
   - Native support for SQL queries and DataFrame operations
   - Familiar APIs for data scientists and analysts
   - Easy integration with PostgreSQL and other data sources

4. **Performance Optimization**
   - Built-in optimization engine (Catalyst) for query planning
   - Configurable parallelism and partitioning for better performance
   - Lazy evaluation enables efficient execution plans

5. **Integration Capabilities**
   - Seamless connection to PostgreSQL using JDBC
   - Compatible with Python ecosystem (pandas, numpy, etc.)
   - Support for various file formats (CSV, Parquet, JSON, etc.)

The configuration used in this project is optimized for local development while maintaining the ability to scale up when needed:
- Multiple executor cores for parallel processing
- Sufficient memory allocation for both driver and executors
- Off-heap memory support for better memory management
- Optimized shuffle partitions for balanced data distribution

In [1]:
from pyspark.sql import SparkSession
import warnings
import google.generativeai as genai


spark = (
    SparkSession.builder.appName("FlightDelays")
    .master("local[*]")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "4")
    .config("spark.default.parallelism", "100")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "4g")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")


your 131072x1 screen size is bogus. expect trouble
25/02/24 18:27:43 WARN Utils: Your hostname, DESKTOP-U7R862J resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/02/24 18:27:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/suman/.conda/envs/documentai/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/suman/.ivy2/cache
The jars for the packages stored in: /home/suman/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-df2d08b9-3842-4fdf-a4e9-4ae6e757622d;1.0
	confs: [default]
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
:: resolution report :: resolve 250ms :: artifacts dl 22ms
	:: modules in use:
	org.checkerframework#checker-qual;3.31.0 from central in [default]
	org.postgresql#postgresql;42.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------------------

## Backend Database Connection

In [2]:
import os

DB_PASS = os.getenv("DB_PASS")

postgres_url = "jdbc:postgresql://localhost:5432/text2sql"
postgres_properties = {
    "user": "postgres",
    "password": DB_PASS,
    "driver": "org.postgresql.Driver",
}

In [3]:
df = spark.read.csv(
    "/home/suman/.cache/kagglehub/datasets/usdot/flight-delays/versions/1/flights.csv",
    header=True,
)
df.show()

25/02/24 18:27:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [4]:
%pip install kagglehub
import kagglehub

# Download latest version
path = kagglehub.dataset_download("usdot/flight-delays")

print("Path to dataset files:", path)


Note: you may need to restart the kernel to use updated packages.
Path to dataset files: /home/suman/.cache/kagglehub/datasets/usdot/flight-delays/versions/1


In [5]:
import pandas as pd
import os

# Assuming the path variable from the previous cell contains the directory
# where the CSV files are located
csv_files = [f for f in os.listdir(path) if f.endswith(".csv")]

# Create an empty dictionary to store the dataframes
dataframes = {}

# Loop through the files and load them into pandas dataframes
for file in csv_files:
    file_path = os.path.join(path, file)
    # Use the file name (without extension) as the key in the dictionary
    df_name = os.path.splitext(file)[0]
    dataframes[df_name] = pd.read_csv(file_path)


  dataframes[df_name] = pd.read_csv(file_path)


In [6]:
df_flights = dataframes.get("flights")
df_airlines = dataframes.get("airlines")
df_airports = dataframes.get("airports")

In [7]:
df_flights_sample = df_flights.sample(n=5000, random_state=42)
df_flights_sample.to_csv("flights.csv", index=False)
df_airlines.to_csv("airlines.csv", index=False)
df_airports.to_csv("airports.csv", index=False)


In [8]:
df = spark.read.csv("flights.csv", header=True)
df.show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-