# 📘 01_data_ingestion.ipynb

# This notebook loads NYC Yellow Taxi Parquet data and merges it into a single DataFrame for further processing.


# 1: Imports & Path Setup

In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, floor

# Set your project root and import config paths
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from src.config import RAW_DATA_PATH, EXTERNAL_DATA_PATH, PROCESSED_DATA_PATH


# 2: Initialize Spark Session

In [4]:
def get_spark_session():
    try:
        spark = SparkSession.builder \
            .master("local[*]") \
            .appName("MonthlyNYCIngestion") \
            .config("spark.sql.shuffle.partitions", "4") \
            .config("spark.driver.memory", "4g") \
            .getOrCreate()
        print(" Spark session initialized successfully.")
        return spark
    except Exception as e:
        print(f" Failed to create Spark session: {e}")
        raise

spark = get_spark_session()


 Spark session initialized successfully.


# 3:  Load Location Lookup CSV

In [5]:
def load_lookup_csv(spark):
    return spark.read.option("header", True).csv(EXTERNAL_DATA_PATH)

lookup_df = load_lookup_csv(spark)
lookup_df.show(3)


+----------+-------+--------------------+------------+-----------+------------+
|LocationID|Borough|                Zone|service_zone|   latitude|   longitude|
+----------+-------+--------------------+------------+-----------+------------+
|         1|    EWR|      Newark Airport|         EWR|40.69287997|-74.18544993|
|         2| Queens|         Jamaica Bay|   Boro Zone|    40.6057|    -73.8713|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|40.86521003| -73.8435548|
+----------+-------+--------------------+------------+-----------+------------+
only showing top 3 rows



# 4: Enrichment Function

In [6]:
def enrich_with_location(df, lookup_df):
    pickup_lookup = lookup_df.drop("service_zone") \
        .withColumnRenamed("LocationID", "PULocationID") \
        .withColumnRenamed("Zone", "pickup_zone") \
        .withColumnRenamed("latitude", "pickup_lat") \
        .withColumnRenamed("longitude", "pickup_lon") \
        .withColumnRenamed("Borough", "pickup_borough")

    dropoff_lookup = lookup_df.drop("Borough") \
        .withColumnRenamed("LocationID", "DOLocationID") \
        .withColumnRenamed("Zone", "dropoff_zone") \
        .withColumnRenamed("latitude", "dropoff_lat") \
        .withColumnRenamed("longitude", "dropoff_lon") \
        .withColumnRenamed("service_zone", "dropoff_service_zone")

    return df.join(pickup_lookup, on="PULocationID", how="left") \
             .join(dropoff_lookup, on="DOLocationID", how="left")


# 5: Save CSV in Chunks

In [None]:
def save_csv_in_chunks(df, output_dir, max_rows=1_000_000, month_tag=""):
    df_count = df.count()
    num_chunks = (df_count // max_rows) + (1 if df_count % max_rows != 0 else 0)

    print(f"Saving {df_count} rows in ~{num_chunks} CSV chunks for {month_tag}...")

    df_with_index = df.withColumn("row_num", monotonically_increasing_id())
    df_with_chunk_id = df_with_index.withColumn("chunk_id", floor(col("row_num") / max_rows))

    os.makedirs(output_dir, exist_ok=True)

    for i in range(num_chunks):
        chunk = df_with_chunk_id.filter(col("chunk_id") == i).drop("row_num", "chunk_id")
        chunk_path = os.path.join(output_dir, f"{month_tag}_trip_data_part_{i+1}.csv")
        chunk.write.mode("overwrite").option("header", True).csv(chunk_path)
        print(f"Saved chunk {i+1} to: {chunk_path}")


# 6: Run Ingestion for All Monthly Parquet Files

In [8]:
input_files = sorted([f for f in os.listdir(RAW_DATA_PATH) if f.endswith(".parquet")])
if not input_files:
    raise FileNotFoundError("No Parquet files found in RAW_DATA_PATH.")

for idx, filename in enumerate(input_files, 1):
    print(f"\nProcessing file {idx}/{len(input_files)}: {filename}")
    file_path = os.path.join(RAW_DATA_PATH, filename)

    # Month tag from filename
    month_tag = f"month_{filename[-12:-8]}" if "-" in filename else f"month_{idx:02d}"

    # Step 1: Load Parquet
    df = spark.read.parquet(file_path)
    print(f" Loaded {df.count()} raw rows from {filename}")

    # Step 2: Clean
    cleaned_df = df.dropna().filter("trip_distance > 0 AND fare_amount > 0")

    # Step 3: Enrich
    enriched_df = enrich_with_location(cleaned_df, lookup_df)

    # Preview
    print(f" Sample rows for {month_tag}:")
    enriched_df.select("tpep_pickup_datetime", "pickup_zone", "dropoff_zone").show(3, truncate=False)

    # Step 4: Save Parquet
    parquet_output_path = os.path.join(PROCESSED_DATA_PATH, f"{month_tag}.parquet")
    enriched_df.coalesce(1).write.mode("overwrite").parquet(parquet_output_path)
    print(f"Saved cleaned Parquet to: {parquet_output_path}")

    # Step 5: Save CSV Chunks
    csv_output_dir = os.path.join(PROCESSED_DATA_PATH, "csv_chunks")
    save_csv_in_chunks(enriched_df, csv_output_dir, month_tag=month_tag)



Processing file 1/12: yellow_tripdata_2024-01.parquet
 Loaded 2964624 raw rows from yellow_tripdata_2024-01.parquet
 Sample rows for month_4-01:
+--------------------+----------------------------+---------------------+
|tpep_pickup_datetime|pickup_zone                 |dropoff_zone         |
+--------------------+----------------------------+---------------------+
|2024-01-01 00:57:55 |Penn Station/Madison Sq West|East Village         |
|2024-01-01 00:03:00 |Lenox Hill East             |Upper East Side North|
|2024-01-01 00:17:06 |Upper East Side North       |East Village         |
+--------------------+----------------------------+---------------------+
only showing top 3 rows

Saved cleaned Parquet to: c:\Users\VaishnaviM\Desktop\BIG_DATA\data\processed\month_4-01.parquet
🔄 Saving 2754465 rows in ~3 CSV chunks for month_4-01...
📝 Saved chunk 1 to: c:\Users\VaishnaviM\Desktop\BIG_DATA\data\processed\csv_chunks\month_4-01_trip_data_part_1.csv
📝 Saved chunk 2 to: c:\Users\VaishnaviM\De

# 7: Stop Spark


In [11]:
spark.stop()
print("🛑 Spark session stopped.")


🛑 Spark session stopped.
