# COMP 3610 — Assignment 1: Data Pipeline & Visualization Dashboard

**Student:** Nie-l Constance |
**Course:** COMP 3610 — Big Data Analytics |
**Semester:** II, 2025-2026 

**AI Assistance Disclosure:** This notebook used Deepseek and VSCode Autocompletions for code suggestions and structure. All work and final code are authored and reviewed by the student (myself).

## Part 1: Data Ingestion
This section downloads the required files, validates the raw data schema and datetimes, and saves raw files into `data/raw/`.

In [None]:
# Part 1: Programmatic download and validation
import os
import requests
import pandas as pd
from pathlib import Path

RAW_DIR = Path("data/raw")
TRIP_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
ZONE_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
TRIP_PATH = RAW_DIR / "yellow_tripdata_2024-01.parquet"
ZONE_PATH = RAW_DIR / "taxi_zone_lookup.csv"

def ensure_raw_dir():
    RAW_DIR.mkdir(parents=True, exist_ok=True)

def download_file(url, dest, max_retries=3):
    for attempt in range(1, max_retries + 1):
        resp = requests.get(url, timeout=30)
        if resp.status_code == 200:
            data = resp.content
            # quick sanity size check
            if len(data) < 100:
                raise ValueError(f"Downloaded content for {url} looks too small ({len(data)} bytes)")
            with open(dest, "wb") as f:
                f.write(data)
            return dest
        if attempt == max_retries:
            resp.raise_for_status()
    raise RuntimeError("Unreachable")

def download_if_missing(url, dest):
    if not dest.exists():
        print(f"Downloading {url} ...")
        download_file(url, dest)
    else:
        print(f"Found existing file: {dest}")

# Create .gitignore entry for data/ if missing
GITIGNORE = Path(".gitignore")
if GITIGNORE.exists():
    txt = GITIGNORE.read_text()
    if "data/" not in txt:
        with open(GITIGNORE, "a") as f:
            f.write("\n# Ignore raw/processed data\ndata/\n")
else:
    with open(GITIGNORE, "w") as f:
        f.write("# Generated by assignment\n# Ignore raw/processed data\ndata/\n")

ensure_raw_dir()
download_if_missing(TRIP_URL, TRIP_PATH)
download_if_missing(ZONE_URL, ZONE_PATH)

print("Download complete. Files saved to", RAW_DIR)


### Data validation checks

In [None]:
# Validation helpers
import pandas as pd
import sys

# Define expected column set per assignment spec
EXPECTED_COLS = [
    "tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID",
    "passenger_count","trip_distance","fare_amount","tip_amount","total_amount","payment_type"
]

def validate_trip_df(df: pd.DataFrame):
    """
    Validate that the raw trip data contains all required columns and valid datetime fields.
    Raises exceptions if validation fails per assignment requirement.
    """
    # Check for missing columns
    missing = [c for c in EXPECTED_COLS if c not in df.columns]
    if missing:
        error_msg = f"VALIDATION FAILED: Missing required columns: {missing}"
        print(error_msg, file=sys.stderr)
        raise AssertionError(error_msg)
    
    # Validate datetime columns can be parsed
    for col in ["tpep_pickup_datetime","tpep_dropoff_datetime"]:
        df[col] = pd.to_datetime(df[col], errors="coerce")
        if df[col].isna().all():
            error_msg = f"VALIDATION FAILED: Column '{col}' could not be parsed as datetimes - all values are NaT"
            print(error_msg, file=sys.stderr)
            raise TypeError(error_msg)
        nan_count = df[col].isna().sum()
        if nan_count > 0:
            print(f"WARNING: {nan_count:,} NaT values found in {col} (will be removed in cleaning step)")
        if not pd.api.types.is_datetime64_any_dtype(df[col]):
            error_msg = f"VALIDATION FAILED: Column '{col}' is not a datetime type after parsing"
            print(error_msg, file=sys.stderr)
            raise TypeError(error_msg)
    
    # Report validation success
    print("✓ VALIDATION PASSED")
    print(f"✓ All {len(EXPECTED_COLS)} expected columns present")
    print(f"✓ Datetime columns are valid and parseable")
    print(f"\n--- Data Summary ---")
    print(f"Total rows in dataset: {len(df):,}")
    print(f"\nColumn data types:")
    for col in EXPECTED_COLS:
        print(f"  {col}: {df[col].dtype}")
    return True

# Load full dataset and validate
print("Loading dataset from:", TRIP_PATH)
trip_df = pd.read_parquet(TRIP_PATH)
print(f"Raw dataset loaded: {len(trip_df):,} rows\n")
validate_trip_df(trip_df)


## Part 2: Data Transformation & Analysis
This section performs data cleaning and feature engineering using Pandas. Code is organized into reusable functions and tested with small examples where possible.

In [None]:
# Load the full dataset (already downloaded in Part 1)
# Ensure datetimes are parsed
trip_df["tpep_pickup_datetime"] = pd.to_datetime(trip_df["tpep_pickup_datetime"], errors="coerce")
trip_df["tpep_dropoff_datetime"] = pd.to_datetime(trip_df["tpep_dropoff_datetime"], errors="coerce")

def cleaning_summary(orig, after_null, after_invalid, after_time):
    removed_nulls = orig - after_null
    removed_invalid = after_null - after_invalid
    removed_time = after_invalid - after_time
    return {
        "original_rows": orig,
        "removed_nulls": removed_nulls,
        "removed_invalid": removed_invalid,
        "removed_time": removed_time,
        "final_rows": after_time
    }

def clean_trip_df(df: pd.DataFrame):
    # Work on a copy to avoid side effects
    df = df.copy()
    orig = len(df)
    # Drop nulls in critical columns
    df = df.dropna(subset=["tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID","fare_amount"])
    after_null = len(df)
    # Filter out invalid trips
    df = df[(df["trip_distance"] > 0) & (df["fare_amount"] >= 0) & (df["fare_amount"] <= 500)]
    after_invalid = len(df)
    # Remove rows where dropoff is before pickup
    df = df[df["tpep_dropoff_datetime"] >= df["tpep_pickup_datetime"]]
    after_time = len(df)
    summary = cleaning_summary(orig, after_null, after_invalid, after_time)

    print("Cleaning summary (full dataset):")
    print(f"- Original rows: {summary['original_rows']:,}")
    print(f"- Removed nulls in critical columns: {summary['removed_nulls']:,}")
    print(f"- Removed invalid distance/fare rows: {summary['removed_invalid']:,}")
    print(f"- Removed dropoff-before-pickup rows: {summary['removed_time']:,}")
    print(f"- Final rows: {summary['final_rows']:,}")

    removals = {
        "null critical values": summary["removed_nulls"],
        "invalid distance/fare": summary["removed_invalid"],
        "dropoff before pickup": summary["removed_time"],
    }
    main_reason = max(removals, key=removals.get)
    print(f"Observation: The largest share of removals came from {main_reason}.")

    return df, summary

trip_clean, clean_summary = clean_trip_df(trip_df)
print("Cleaned rows:", len(trip_clean))


### Feature engineering (exactly 4 new columns)

In [None]:
def feature_engineer(df: pd.DataFrame):
    df = df.copy()
    # 1) trip_duration_minutes
    df["trip_duration_minutes"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60.0
    df["trip_duration_minutes"] = df["trip_duration_minutes"].clip(lower=0)
    # 2) trip_speed_mph (distance / hours) - handle zero duration safely
    duration_hours = df["trip_duration_minutes"] / 60.0
    df["trip_speed_mph"] = df["trip_distance"].div(duration_hours)
    df["trip_speed_mph"] = df["trip_speed_mph"].replace([float("inf"), -float("inf")], 0).fillna(0)
    # 3) pickup_hour (0-23)
    df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
    # 4) pickup_day_of_week (Monday - Sunday)
    df["pickup_day_of_week"] = df["tpep_pickup_datetime"].dt.day_name()
    return df

# Apply to full cleaned dataset
trip_fe = feature_engineer(trip_clean)
print(trip_fe[["trip_duration_minutes","trip_speed_mph","pickup_hour","pickup_day_of_week"]].head())

# Simple unit test on a tiny example
_test = pd.DataFrame([
    {
        "tpep_pickup_datetime": "2024-01-01 10:00:00",
        "tpep_dropoff_datetime": "2024-01-01 10:05:00",
        "PULocationID": 1,
        "DOLocationID": 2,
        "trip_distance": 1.2,
        "fare_amount": 5.0,
        "tip_amount": 1.0,
        "total_amount": 6.0,
        "payment_type": 1,
    }
])
_test["tpep_pickup_datetime"] = pd.to_datetime(_test["tpep_pickup_datetime"])
_test["tpep_dropoff_datetime"] = pd.to_datetime(_test["tpep_dropoff_datetime"])
_test_fe = feature_engineer(_test)
assert _test_fe.loc[0, "trip_duration_minutes"] == 5.0
assert _test_fe.loc[0, "pickup_hour"] == 10
print("Feature engineering tests passed")


## Part 2: SQL Analysis using DuckDB
We'll load the cleaned DataFrame into DuckDB (in-memory) and run the 5 required queries.

In [None]:
import duckdb

# Register cleaned data into DuckDB for SQL queries
con = duckdb.connect(database=":memory:")

# Use full feature-engineered dataset
con.register("trips", trip_fe)

# Load zones table
zones_df_full = pd.read_csv(ZONE_PATH)
if "Zone" in zones_df_full.columns:
    zones_df_full = zones_df_full.rename(columns={"Zone": "zone"})
con.register("zones", zones_df_full)

print("DuckDB in-memory tables registered: trips and zones")


### Query 1: Top 10 busiest pickup zones by total number of trips
This query returns the top 10 pickup zones by trip count, including zone names from the taxi zone lookup table.

In [None]:
q1 = """
SELECT t.PULocationID, z.zone, count(*) AS trips
FROM trips AS t
JOIN zones AS z ON t.PULocationID = z.LocationID
GROUP BY t.PULocationID, z.zone
ORDER BY trips DESC
LIMIT 10;
"""
print("SQL Query 1 results:")
q1_df = con.execute(q1).df()
print(q1_df)
if not q1_df.empty:
    top_zone = q1_df.iloc[0]["zone"]
    top_trips = int(q1_df.iloc[0]["trips"])
    print(f"Interpretation: The busiest pickup zone is {top_zone} with {top_trips:,} trips, indicating a strong concentration of demand in that area.")


### Query 2: Average fare by hour of day
This query computes the average fare amount for each hour (0-23) and orders results by hour.

In [None]:
q2 = """
SELECT pickup_hour, AVG(fare_amount) AS avg_fare
FROM trips
GROUP BY pickup_hour
ORDER BY pickup_hour
"""
print("SQL Query 2 results:")
q2_df = con.execute(q2).df()
print(q2_df)
if not q2_df.empty:
    peak = q2_df.loc[q2_df["avg_fare"].idxmax()]
    low = q2_df.loc[q2_df["avg_fare"].idxmin()]
    print(f"Interpretation: The highest average fare occurs around {int(peak['pickup_hour'])}:00 (${peak['avg_fare']:.2f}), while the lowest is around {int(low['pickup_hour'])}:00 (${low['avg_fare']:.2f}).")


### Query 3: Payment type percentage
This query computes the count and percentage share of trips by payment type.

In [None]:
q3 = """
SELECT payment_type, COUNT(*) AS cnt, 100.0 * COUNT(*) / SUM(COUNT(*)) OVER() AS pct
FROM trips
GROUP BY payment_type
ORDER BY cnt DESC
"""
print("SQL Query 3 results:")
q3_df = con.execute(q3).df()
print(q3_df)
if not q3_df.empty:
    pay_map = {1: "Credit card", 2: "Cash", 3: "No charge", 4: "Dispute", 5: "Unknown"}
    top_code = int(q3_df.iloc[0]["payment_type"])
    top_label = pay_map.get(top_code, f"Code {top_code}")
    top_pct = q3_df.iloc[0]["pct"]
    print(f"Interpretation: {top_label} is the dominant payment method at {top_pct:.1f}% of trips.")


### Query 4: Average tip percentage by day (credit card payments only)
This query computes the average tip percentage (tip_amount / fare_amount) by day of week, restricted to credit card payments (payment_type = 1).

In [None]:
q4 = """
SELECT pickup_day_of_week, AVG(CASE WHEN fare_amount > 0 THEN tip_amount / fare_amount ELSE NULL END) AS avg_tip_pct
FROM trips
WHERE payment_type = 1
GROUP BY pickup_day_of_week
ORDER BY array_position(['Monday','Tuesday','Wednesday','Thursday','Friday','Saturday','Sunday'], pickup_day_of_week)
"""
print("SQL Query 4 results:")
q4_df = con.execute(q4).df()
print(q4_df)
if not q4_df.empty:
    peak = q4_df.loc[q4_df["avg_tip_pct"].idxmax()]
    print(f"Interpretation: The highest average credit-card tip percentage occurs on {peak['pickup_day_of_week']} ({peak['avg_tip_pct']:.2%}).")


### Query 5: Top 5 pickup-dropoff zone pairs
This query finds the top 5 most common pickup-dropoff zone pairs and includes zone names for readability.

In [None]:
q5 = """
SELECT t.PULocationID AS pu_ID, p.zone AS pu_zone, t.DOLocationID AS do_ID, d.zone AS do_zone, COUNT(*) AS trips
FROM trips t
LEFT JOIN zones p ON t.PULocationID = p.LocationID
LEFT JOIN zones d ON t.DOLocationID = d.LocationID
GROUP BY pu_ID, pu_zone, do_ID, do_zone
ORDER BY trips DESC
LIMIT 5
"""
print("SQL Query 5 results:")
q5_df = con.execute(q5).df()
print(q5_df)
if not q5_df.empty:
    top_pair = q5_df.iloc[0]
    print(f"Interpretation: The most common pickup-dropoff pair is {top_pair['pu_zone']} to {top_pair['do_zone']} with {int(top_pair['trips']):,} trips.")


## Part 3: Dashboard Development (prototypes)
This section prototypes the visualizations required for the Streamlit app using Plotly. Each plot is followed by a short interpretation (2–3 sentences).

In [None]:
import plotly.express as px

# Use full feature-engineered dataset for prototypes
# If you want to speed up plotting, uncomment the sample line below
# df_plot = trip_fe.sample(n=min(len(trip_fe), 200000), random_state=42)

def center_titles(fig, bold=False):
    fig.update_layout(title_x=0.5)
    return fig

df_plot = trip_fe.copy()

# Add zone names using lookup table
zones_df_full = pd.read_csv(ZONE_PATH)
if "Zone" in zones_df_full.columns:
    zones_df_full = zones_df_full.rename(columns={"Zone": "zone"})
zone_map = zones_df_full.set_index("LocationID")["zone"].to_dict()

df_plot["PU_zone"] = df_plot["PULocationID"].map(zone_map)
df_plot["DO_zone"] = df_plot["DOLocationID"].map(zone_map)

# Visualization 1: Top 10 pickup zones (bar)
top_pu = df_plot["PU_zone"].value_counts().head(10).reset_index()
top_pu.columns = ["zone","trips"]
fig_bar = center_titles(
    px.bar(
        top_pu, 
        x="zone", 
        y="trips", 
        title="Top Pickup Zones",
        labels={"zone": "Pickup Zone", "trips": "Number of Trips"},
    )
)
fig_bar.show()
if not top_pu.empty:
    top_zone = top_pu.loc[0, "zone"]
    top_trips = int(top_pu.loc[0, "trips"])
    top_share = 100.0 * top_trips / len(df_plot)
    print(f"Interpretation: The busiest pickup zone is {top_zone} with {top_trips:,} trips ({top_share:.1f}% of all trips). The top 10 zones together account for {100.0 * top_pu['trips'].sum() / len(df_plot):.1f}% of trips, showing strong spatial concentration.")

# Visualization 2: Avg fare by hour (line)
hourly = df_plot.groupby("pickup_hour")["fare_amount"].mean().reset_index()
fig_line = center_titles(
    px.line(
        hourly, 
        x="pickup_hour", 
        y="fare_amount", 
        title="Average Fare by Hour", 
        markers=True,
        labels={"pickup_hour": "Pickup Hour", "fare_amount": "Average Fare ($)"},
    )
)
fig_line.update_xaxes(range=[0, 23]) # Set x-axis range to cover all hours
fig_line.update_xaxes(dtick=1) # Show every hour on x-axis
fig_line.show()
if not hourly.empty:
    peak = hourly.loc[hourly["fare_amount"].idxmax()]
    low = hourly.loc[hourly["fare_amount"].idxmin()]
    print(f"Interpretation: The highest average fare occurs around {int(peak['pickup_hour'])}:00 (${peak['fare_amount']:.2f}), while the lowest is around {int(low['pickup_hour'])}:00 (${low['fare_amount']:.2f}). This indicates clear hourly pricing patterns likely tied to demand peaks.")

# Visualization 3: Histogram of trip distances
fig_hist = center_titles(
    px.histogram(
        df_plot, 
        x="trip_distance", 
        nbins=30, 
        title="Trip Distance Distribution",
        labels={"trip_distance": "Trip Distance (miles)"}
    )
)
fig_hist.show()
median_dist = df_plot["trip_distance"].median()
percentile_90 = df_plot["trip_distance"].quantile(0.9)
print(f"Interpretation: The median trip distance is {median_dist:.2f} miles, while 90% of trips are under {percentile_90:.2f} miles. This shows most trips are short with a long tail of longer journeys.")

# Visualization 4: Payment type breakdown (Pie Chart)
pay_map = {1:"Credit card",2:"Cash",3:"No charge",4:"Dispute",5:"Unknown"}
df_plot["payment_label"] = df_plot["payment_type"].map(pay_map).fillna("Other")
fig_pie = center_titles(
    px.pie(
        df_plot, 
        names="payment_label", 
        title="Payment Types",
        labels={"payment_label": "Payment Type"}
    )
)
fig_pie.show()
pay_counts = df_plot["payment_label"].value_counts(normalize=True) * 100
if not pay_counts.empty:
    top_pay = pay_counts.index[0]
    top_pct = pay_counts.iloc[0]
    print(f"Interpretation: {top_pay} is the dominant payment method at {top_pct:.1f}% of trips. Payment mix influences tip reporting and revenue capture, so digital payment performance is especially important.")

# Visualization 5: Heatmap day-of-week vs hour
heat = df_plot.groupby(["pickup_day_of_week","pickup_hour"]).size().reset_index(name="trips")
heat_pivot = heat.pivot(index="pickup_day_of_week", columns="pickup_hour", values="trips").fillna(0)
fig_heat = center_titles(
    px.imshow(
        heat_pivot, 
        title="Trips by Day and Hour", 
        aspect="auto",
        labels={"x": "Pickup Hour", "y": "Day of Week", "color": "Number of Trips"}
    )
)
fig_heat.update_xaxes(dtick=1) # Show every hour on x-axis
fig_heat.show()
max_idx = heat_pivot.stack().idxmax()
max_val = int(heat_pivot.stack().max())
print(f"Interpretation: The busiest time slot is {max_idx[0]} around {int(max_idx[1])}:00 with {max_val:,} trips. Demand is concentrated at specific weekday-hour combinations, useful for staffing and pricing decisions.")


### Interactive filters (prototype)
Below we provide reusable filter functions that will later be used in the Streamlit app. These functions accept a DataFrame and filter criteria and return a filtered DataFrame.

In [None]:
def apply_filters(df, date_min=None, date_max=None, hour_min=0, hour_max=23, payment_types=None):
    out = df.copy()
    if date_min is not None:
        out = out[out['tpep_pickup_datetime'].dt.date >= date_min]
    if date_max is not None:
        out = out[out['tpep_pickup_datetime'].dt.date <= date_max]
    out = out[(out['pickup_hour'] >= hour_min) & (out['pickup_hour'] <= hour_max)]
    if payment_types is not None:
        out = out[out['payment_type'].isin(payment_types)]
    return out

# Example usage
from datetime import date
f = apply_filters(
    df_plot, 
    date_min=date(2024,1,1), 
    date_max=date(2024,1,31), 
    hour_min=7, 
    hour_max=19, 
    payment_types=[1,2]
)
print('Filtered rows:', len(f))

## Part 3: Streamlit app (`app.py`)
The `app.py` file in the repository contains the deployed Streamlit app. It implements the same filters and visualizations as prototypes above and displays required metrics using `st.metric()`.

## Part 4: Documentation & Code Quality
- Notebook includes modular functions with comments and tests for core behaviors (cleaning and feature engineering).
- See `README.md` for setup, deployment, and testing instructions.

**AI Assistance:** Deepseek and VSCode Autocompletion provided code suggestions and structure. All code was reviewed and adjusted by the student.

## AI Tools Used

This assignment used Deepseek and VSCode Autocompletions for code suggestions and outlining. All code was reviewed and finalized by the student (myself).
