# **COMP 3610 - ASSIGNMENT 1**
### _**Samuel Soman - 816039318**_
Data Pipeline & Visualization Dashboard
- This notebook builds an end-to-end data pipeline that ingests, transforms, and analyzes the NYC Yellow Taxi Trip dataset (January 2024). 
- We download the data programmatically, clean and validate it, perform SQL-based analysis with DuckDB, and prototype interactive visualizations for the Streamlit dashboard.

## _**Part 1: Data Ingestion & Storage**_
This part contains Programmatic Download, Data Validation, File Organisation.

In [29]:
# Import all libraries required for the assignment
import os
import requests
import pandas as pd
import numpy as np
import duckdb
import plotly.express as px
import plotly.graph_objects as go

print('All Libraries Imported Successfully!')

All Libraries Imported Successfully!


### 1. Programmatic Download

In [30]:
TRIP_DATA_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
ZONE_LOOKUP_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

RAW_DIR = os.path.join("data", "raw")
os.makedirs(RAW_DIR, exist_ok=True)

TRIP_DATA_PATH = os.path.join(RAW_DIR, "yellow_tripdata_2024-01.parquet")
ZONE_LOOKUP_PATH = os.path.join(RAW_DIR, "taxi_zone_lookup.csv")

In [31]:
def download_file(url, dest_path):
    if os.path.exists(dest_path):
        print(f"Already exists, skipping: {dest_path}")
        return

    print(f"Downloading {url} ...")
    response = requests.get(url, stream=True)
    response.raise_for_status()

    with open(dest_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    size_mb = os.path.getsize(dest_path) / (1024 * 1024)
    print(f"Saved to {dest_path} ({size_mb:.1f} MB)")


# Download both datasets
download_file(TRIP_DATA_URL, TRIP_DATA_PATH)
download_file(ZONE_LOOKUP_URL, ZONE_LOOKUP_PATH)

Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet ...
Saved to data\raw\yellow_tripdata_2024-01.parquet (47.6 MB)
Downloading https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv ...
Saved to data\raw\taxi_zone_lookup.csv (0.0 MB)


### 2. Data Validation
Implement validation checks that:
    
    a) Verify all expected columns exist in the dataset

    b) Check that date columns are valid datetime types

    c) Report total row count and print a summary to the console

    d) Raise an exception or exit with an error message if validation fails

In [32]:
# Validate that the downloads completed successfully
for label, path in [("Trip data", TRIP_DATA_PATH), ("Zone lookup", ZONE_LOOKUP_PATH)]:
    if os.path.exists(path):
        size_mb = os.path.getsize(path) / (1024 * 1024)
        print(f"{label}: {path} ({size_mb:.2f} MB)")
    else:
        print(f"{label}: {path} NOT FOUND!")

# Validate parquet file is readable
df_raw = pd.read_parquet(TRIP_DATA_PATH)
print(f"\nTrip dataset shape: {df_raw.shape}")
print(f"Columns: {list(df_raw.columns)}")

# Validate zone lookup CSV
zone_df = pd.read_csv(ZONE_LOOKUP_PATH)
print(f"\nZone lookup shape: {zone_df.shape}")
print(f"Columns: {list(zone_df.columns)}")

Trip data: data\raw\yellow_tripdata_2024-01.parquet (47.65 MB)
Zone lookup: data\raw\taxi_zone_lookup.csv (0.01 MB)

Trip dataset shape: (2964624, 19)
Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee']

Zone lookup shape: (265, 4)
Columns: ['LocationID', 'Borough', 'Zone', 'service_zone']


### 3. File Organisation & Initial Inspection
Save downloaded files to a data/raw/ directory. Include a .gitignore file that excludes the data directory from version control.

In [33]:
print("data/ directory structure:")
for root, dirs, files in os.walk("data"):
    level = root.replace("data", "").count(os.sep)
    indent = "  " * level
    print(f"{indent}{os.path.basename(root)}/")
    for f in files:
        size_mb = os.path.getsize(os.path.join(root, f)) / (1024 * 1024)
        print(f"{indent}  {f} ({size_mb:.2f} MB)")

print("\n--- Data Types ---")
print(df_raw.dtypes)

print("\n--- First 5 Rows ---")
df_raw.head()

data/ directory structure:
data/
  raw/
    taxi_zone_lookup.csv (0.01 MB)
    yellow_tripdata_2024-01.parquet (47.65 MB)

--- Data Types ---
VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
dtype: object

--- First 5 Rows ---


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.8,1.0,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.7,1.0,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.4,1.0,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.8,1.0,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0


## _**Part 2: Data Transformation & Analysis**_
This part covers Data Cleaning, Feature Engineering, Saving the Processed Dataset, and Running SQL queries with DuckDB.

### 4. Data Cleaning

    e) Removing rows with null values in critical columns (pickup/dropoff times, locations,
    fare)

    f) Filtering out invalid trips: trips with zero or negative distance, negative fares, or fares
    exceeding $500

    g) Removing trips where dropoff time is before pickup time

    h) Documenting how many rows were removed and why (print summary to console)

In [None]:
df = df_raw.copy()
initial_rows = len(df)

# Ensure datetime columns are proper datetime type
df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])
df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"])

# 1. Remove rows outside January 2024
df = df[
    (df["tpep_pickup_datetime"] >= "2024-01-01")
    & (df["tpep_pickup_datetime"] < "2024-02-01")
]
print(f"After date filter: {len(df):,} rows (removed {initial_rows - len(df):,})")

# 2. Remove rows where dropoff is before pickup
df = df[df["tpep_dropoff_datetime"] > df["tpep_pickup_datetime"]]
print(f"After dropoff > pickup filter: {len(df):,} rows")

# 3. Remove non-positive distances and fares
df = df[df["trip_distance"] > 0]
df = df[df["fare_amount"] > 0]
df = df[df["total_amount"] > 0]
print(f"After positive distance/fare filter: {len(df):,} rows")

# 4. Remove unreasonably long trips (> 200 miles or > 5 hours)
df["trip_duration_min"] = (
    (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60
)
df = df[df["trip_distance"] <= 200]
df = df[df["trip_duration_min"] <= 300]  # 5 hours max
df = df[df["trip_duration_min"] >= 1]    # at least 1 min
print(f"After duration/distance cap: {len(df):,} rows")

# 5. Remove unreasonable passenger counts
df = df[(df["passenger_count"] >= 1) & (df["passenger_count"] <= 9)]
print(f"After passenger count filter: {len(df):,} rows")

# Summary
removed = initial_rows - len(df)
print(f"\n--- Cleaning Summary ---")
print(f"Original rows:  {initial_rows:,}")
print(f"Cleaned rows:   {len(df):,}")
print(f"Removed:        {removed:,} ({removed/initial_rows*100:.1f}%)")

### 5. Feature Engineering

    i) trip_duration_minutes: calculated from pickup and dropoff timestamps

    j) trip_speed_mph: distance divided by duration (handle division by zero)

    k) pickup_hour: hour of day (0-23) extracted from pickup timestamp

    l) pickup_day_of_week: day name (Monday-Sunday) extracted from pickup
    timestamp

In [None]:
# Extract useful time features
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
df["pickup_day_of_week"] = df["tpep_pickup_datetime"].dt.day_name()
df["pickup_date"] = df["tpep_pickup_datetime"].dt.date

# Speed in mph (distance / duration in hours)
df["avg_speed_mph"] = df["trip_distance"] / (df["trip_duration_min"] / 60)

# Cap unreasonable speeds (> 80 mph is not realistic in NYC)
df.loc[df["avg_speed_mph"] > 80, "avg_speed_mph"] = np.nan

# Tip percentage
df["tip_pct"] = np.where(
    df["fare_amount"] > 0,
    (df["tip_amount"] / df["fare_amount"]) * 100,
    0,
)

print("New features added:")
print(df[["pickup_hour", "pickup_day_of_week", "pickup_date", "trip_duration_min", "avg_speed_mph", "tip_pct"]].head(10))
print(f"\nDataset shape: {df.shape}")

### Save Cleaned Dataset
Save the processed DataFrame as a parquet file for the Streamlit dashboard to read.

In [None]:
PROCESSED_DIR = os.path.join("data", "processed")
os.makedirs(PROCESSED_DIR, exist_ok=True)

clean_path = os.path.join(PROCESSED_DIR, "yellow_2024_01_clean.parquet")
df.to_parquet(clean_path, index=False)

size_mb = os.path.getsize(clean_path) / (1024 * 1024)
print(f"Cleaned dataset saved to {clean_path} ({size_mb:.1f} MB)")
print(f"{len(df):,} rows × {len(df.columns)} columns")

### 6. DuckDB SQL Analysis
Register the cleaned DataFrame as a DuckDB table and run 5 analytical queries. DuckDB lets us write SQL directly against Pandas DataFrames without a separate database server.

In [None]:
# Create an in-memory DuckDB connection and register the DataFrame
con = duckdb.connect()
con.register("trips", df)
con.register("zones", zone_df)

# Quick check
result = con.execute("SELECT COUNT(*) AS total_trips FROM trips").fetchdf()
print(f"Trips registered in DuckDB: {result['total_trips'].iloc[0]:,}")

#### Query 1: Top 10 Busiest Pickup Zones (Number 6 - part m)
Which pickup zones generate the most trips? Joining with the zone lookup table to get human-readable zone names.

In [None]:
q1 = con.execute("""
    SELECT
        z.Zone,
        z.Borough,
        COUNT(*) AS trip_count,
        ROUND(AVG(t.total_amount), 2) AS avg_total
    FROM trips t
    JOIN zones z ON t.PULocationID = z.LocationID
    GROUP BY z.Zone, z.Borough
    ORDER BY trip_count DESC
    LIMIT 10
""").fetchdf()

print("Query 1: Top 10 Busiest Pickup Zones")
q1

#### Query 2: Average Fare and Distance by Hour of Day (Number 6 - part n)
Do fares and distances vary throughout the day? This helps understand peak pricing patterns.

In [None]:
q2 = con.execute("""
    SELECT
        pickup_hour,
        COUNT(*) AS trip_count,
        ROUND(AVG(fare_amount), 2) AS avg_fare,
        ROUND(AVG(trip_distance), 2) AS avg_distance,
        ROUND(AVG(trip_duration_min), 2) AS avg_duration_min
    FROM trips
    GROUP BY pickup_hour
    ORDER BY pickup_hour
""").fetchdf()

print("Query 2: Average Fare and Distance by Hour of Day")
q2

#### Query 3: Payment Type Distribution (Number 6 - part o)
How do passengers pay? Breaking down by payment type with revenue contribution.

In [None]:
q3 = con.execute("""
    SELECT
        CASE payment_type
            WHEN 1 THEN 'Credit Card'
            WHEN 2 THEN 'Cash'
            WHEN 3 THEN 'No Charge'
            WHEN 4 THEN 'Dispute'
            ELSE 'Unknown'
        END AS payment_method,
        COUNT(*) AS trip_count,
        ROUND(SUM(total_amount), 2) AS total_revenue,
        ROUND(AVG(tip_amount), 2) AS avg_tip,
        ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct_of_trips
    FROM trips
    GROUP BY payment_type
    ORDER BY trip_count DESC
""").fetchdf()

print("Query 3: Payment Type Distribution")
q3

#### Query 4: Daily Trip Volume and Revenue (Number 6 - part p)
Day-by-day trends for January 2024 — helps spot weekday vs. weekend patterns and any anomalies.

In [None]:
q4 = con.execute("""
    SELECT
        pickup_date,
        pickup_day_of_week AS day_name,
        COUNT(*) AS trip_count,
        ROUND(SUM(total_amount), 2) AS total_revenue,
        ROUND(AVG(trip_distance), 2) AS avg_distance
    FROM trips
    GROUP BY pickup_date, pickup_day_of_week
    ORDER BY pickup_date
""").fetchdf()

print("Query 4: Daily Trip Volume and Revenue (first 10 days)")
q4.head(10)

#### Query 5: Trip Characteristics by Day of Week (Number 6 - part q)
Comparing weekday vs. weekend travel behaviour: average speed, distance, fare, and tip percentage.

In [None]:
q5 = con.execute("""
    SELECT
        pickup_day_of_week AS day_name,
        COUNT(*) AS trip_count,
        ROUND(AVG(trip_distance), 2) AS avg_distance,
        ROUND(AVG(fare_amount), 2) AS avg_fare,
        ROUND(AVG(tip_pct), 2) AS avg_tip_pct,
        ROUND(AVG(avg_speed_mph), 2) AS avg_speed
    FROM trips
    GROUP BY pickup_day_of_week
    ORDER BY
        CASE pickup_day_of_week
            WHEN 'Monday' THEN 1
            WHEN 'Tuesday' THEN 2
            WHEN 'Wednesday' THEN 3
            WHEN 'Thursday' THEN 4
            WHEN 'Friday' THEN 5
            WHEN 'Saturday' THEN 6
            WHEN 'Sunday' THEN 7
        END
""").fetchdf()

print("Query 5: Trip Characteristics by Day of Week")
q5

## Part 3: Visualization Prototypes
Below are prototypes of all 5 visualizations using Plotly. These are the same charts that appear in the Streamlit dashboard (pages/2_Visualizations.py), but rendered here in the notebook for development and grading purposes.