# COMP3610 Assignment 1

- **Name:** Sonali Maharaj
- **Student ID:** 816034459
- **Course:** COMP3610  
- **Assignment:** Assignment 1  

---

### Part 1: Data Ingestion

- Programatically download the files 

In [None]:
from pathlib import Path
import requests

# Create raw data directory if it doesn't exist
RAW_DIR = Path("data/raw")
RAW_DIR.mkdir(parents=True, exist_ok=True)

# Required files
FILES = [
    (
        "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet",
        RAW_DIR / "yellow_tripdata_2024-01.parquet",
    ),
    (
        "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv",
        RAW_DIR / "taxi_zone_lookup.csv",
    ),
]

def download_file(url, output_path, chunk_size=1024 * 1024):
    """
    Downloads a file from a URL and saves it locally.
    Uses streaming to handle large files efficiently.
    """
    print(f"\nDownloading: {url}")

    with requests.get(url, stream=True, timeout=60) as response:
        response.raise_for_status()

        with open(output_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)

    # Validation check
    if not output_path.exists() or output_path.stat().st_size == 0:
        raise RuntimeError(f"Download failed or file is empty: {output_path}")

    print(f"Saved to: {output_path}")
    print(f"File size: {output_path.stat().st_size / 1e6:.2f} MB")


# Download both required files
for url, path in FILES:
    if path.exists() and path.stat().st_size > 0:
        print(f"File already exists, skipping: {path}")
    else:
        download_file(url, path)


File already exists, skipping: data\raw\yellow_tripdata_2024-01.parquet
File already exists, skipping: data\raw\taxi_zone_lookup.csv


- Data Validation

In [None]:
import polars as pl
from pathlib import Path 

PARQUET_PATH = Path("data/raw/yellow_tripdata_2024-01.parquet")

# Check file exists 
if not PARQUET_PATH.exists():
    raise FileNotFoundError(f"Missing file: {PARQUET_PATH}. Run the download step first.")

# Load dataset 
lf = pl.scan_parquet(str(PARQUET_PATH))
schema = lf.schema
actual_columns = list(schema.keys())

# Expected columns 
EXPECTED_COLUMNS = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "tip_amount",
    "total_amount",
    "payment_type",
]

DATETIME_COLUMNS = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
]

# a) Verify required columns exist 
missing_columns = [col for col in EXPECTED_COLUMNS if col not in actual_columns]

if missing_columns:
    raise ValueError(f"Validation failed: Missing required columns: {missing_columns}")

# b) Check datetime columns are correct type
for col in DATETIME_COLUMNS:
    if schema[col] != pl.Datetime:
        raise TypeError(f"Validation failed: Column '{col}' is not datetime type.")

# c) Report row count and summary 
row_count = lf.select(pl.len()).collect().item()

print("Validation Passed")
print(f"Total rows: {row_count:,}")
print(f"Total columns: {len(actual_columns)}")

# Print summary statistics 
df = lf.collect()
print("\n--- Dataset Summary ---")
print(df.describe())


Validation Passed
Total rows: 2,964,624
Total columns: 19

--- Dataset Summary ---
shape: (9, 20)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ statistic ┆ VendorID  ┆ tpep_pick ┆ tpep_drop ┆ … ┆ improveme ┆ total_amo ┆ congestio ┆ Airport_ │
│ ---       ┆ ---       ┆ up_dateti ┆ off_datet ┆   ┆ nt_surcha ┆ unt       ┆ n_surchar ┆ fee      │
│ str       ┆ f64       ┆ me        ┆ ime       ┆   ┆ rge       ┆ ---       ┆ ge        ┆ ---      │
│           ┆           ┆ ---       ┆ ---       ┆   ┆ ---       ┆ f64       ┆ ---       ┆ f64      │
│           ┆           ┆ str       ┆ str       ┆   ┆ f64       ┆           ┆ f64       ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ count     ┆ 2.964624e ┆ 2964624   ┆ 2964624   ┆ … ┆ 2.964624e ┆ 2.964624e ┆ 2.824462e ┆ 2.824462 │
│           ┆ 6         ┆           ┆           ┆   ┆ 6         ┆ 6         ┆ 6         ┆ e6  

### Part 2: Data Transformation & Analysis

- Data Cleaning

In [None]:
import polars as pl
from pathlib import Path

PARQUET_PATH = Path("data/raw/yellow_tripdata_2024-01.parquet")
if not PARQUET_PATH.exists():
    raise FileNotFoundError(f"Missing file: {PARQUET_PATH}. Run the download step first.")

lf = pl.scan_parquet(str(PARQUET_PATH))

def count_rows(lazyframe: pl.LazyFrame) -> int:
    return lazyframe.select(pl.len()).collect().item()

start_rows = count_rows(lf)
print(f"Starting rows: {start_rows:,}")

# e) Remove rows with nulls in critical columns
CRITICAL_COLS = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "fare_amount",
]

null_crit_rows = count_rows(lf.filter(pl.any_horizontal([pl.col(c).is_null() for c in CRITICAL_COLS])))
lf1 = lf.drop_nulls(CRITICAL_COLS)
after_nulls = count_rows(lf1)

print("\n(e) Nulls in critical columns")
print(f"Removed: {null_crit_rows:,}")
print(f"Remaining: {after_nulls:,}")

# f) Filter invalid trips:

invalid_distance_rows = count_rows(lf1.filter(pl.col("trip_distance") <= 0))
invalid_fare_rows = count_rows(lf1.filter(pl.col("fare_amount") < 0))
too_high_fare_rows = count_rows(lf1.filter(pl.col("fare_amount") > 500))

lf2 = lf1.filter(
    (pl.col("trip_distance") > 0) &
    (pl.col("fare_amount") >= 0) &
    (pl.col("fare_amount") <= 500)
)
after_invalids = count_rows(lf2)

print("\n(f) Invalid distance/fare filters")
print(f"Removed (distance <= 0): {invalid_distance_rows:,}")
print(f"Removed (fare < 0): {invalid_fare_rows:,}")
print(f"Removed (fare > 500): {too_high_fare_rows:,}")
print(f"Remaining: {after_invalids:,}")

# g) Remove trips where dropoff time is before pickup time
bad_time_rows = count_rows(lf2.filter(pl.col("tpep_dropoff_datetime") < pl.col("tpep_pickup_datetime")))

lf3 = lf2.filter(pl.col("tpep_dropoff_datetime") >= pl.col("tpep_pickup_datetime"))
after_time = count_rows(lf3)

print("\n(g) Dropoff before pickup")
print(f"Removed: {bad_time_rows:,}")
print(f"Remaining: {after_time:,}")

# h) Final summary
total_removed = start_rows - after_time
print("\n(h) Cleaning summary")
print(f"Total removed: {total_removed:,}")
print(f"Final cleaned rows: {after_time:,}")

df_clean = lf3.collect()
df_clean

Starting rows: 2,964,624

(e) Nulls in critical columns
Removed: 0
Remaining: 2,964,624

(f) Invalid distance/fare filters
Removed (distance <= 0): 60,371
Removed (fare < 0): 37,448
Removed (fare > 500): 46
Remaining: 2,870,158

(g) Dropoff before pickup
Removed: 56
Remaining: 2,870,102

(h) Cleaning summary
Total removed: 94,522
Final cleaned rows: 2,870,102


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
i32,datetime[ns],datetime[ns],i64,f64,i64,str,i32,i32,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64
2,2024-01-01 00:57:55,2024-01-01 01:17:43,1,1.72,1,"""N""",186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,2024-01-01 00:03:00,2024-01-01 00:09:36,1,1.8,1,"""N""",140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
1,2024-01-01 00:17:06,2024-01-01 00:35:01,1,4.7,1,"""N""",236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
1,2024-01-01 00:36:38,2024-01-01 00:44:56,1,1.4,1,"""N""",79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
1,2024-01-01 00:46:51,2024-01-01 00:52:57,1,0.8,1,"""N""",211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
2,2024-01-31 23:45:59,2024-01-31 23:54:36,,3.18,,,107,263,0,15.77,0.0,0.5,2.0,0.0,1.0,21.77,,
1,2024-01-31 23:13:07,2024-01-31 23:27:52,,4.0,,,114,236,0,18.4,1.0,0.5,2.34,0.0,1.0,25.74,,
2,2024-01-31 23:19:00,2024-01-31 23:38:00,,3.33,,,211,25,0,19.97,0.0,0.5,0.0,0.0,1.0,23.97,,
2,2024-01-31 23:07:23,2024-01-31 23:25:14,,3.06,,,107,13,0,23.88,0.0,0.5,5.58,0.0,1.0,33.46,,


- Feature Engineering

In [None]:
import polars as pl

if "df_clean" not in globals():
    raise RuntimeError("df_clean not found. Run the data cleaning section first.")

# Duration in minutes 
duration_minutes_expr = (
    (pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime"))
    .dt.total_seconds() / 60
)

df_features = df_clean.with_columns([
    # i) trip_duration_minutes
    duration_minutes_expr.alias("trip_duration_minutes"),

    # j) trip_speed_mph 
    pl.when(duration_minutes_expr > 0)
      .then(pl.col("trip_distance") / (duration_minutes_expr / 60))
      .otherwise(0)
      .alias("trip_speed_mph"),

    # k) pickup_hour 
    pl.col("tpep_pickup_datetime").dt.hour().alias("pickup_hour"),

    # l) pickup day of the week
    pl.col("tpep_pickup_datetime").dt.strftime("%A").alias("pickup_day_of_week"),
])

print("Feature engineering complete.")
df_features.select(["trip_duration_minutes", "trip_speed_mph", "pickup_hour", "pickup_day_of_week"]).head()


Feature engineering complete.


trip_duration_minutes,trip_speed_mph,pickup_hour,pickup_day_of_week
f64,f64,i8,str
19.8,5.212121,0,"""Monday"""
6.6,16.363636,0,"""Monday"""
17.916667,15.739535,0,"""Monday"""
8.3,10.120482,0,"""Monday"""
6.1,7.868852,0,"""Monday"""


- SQL Analysis

In [7]:
# Setup to use duckdb
import duckdb
import polars as pl
from pathlib import Path

LOOKUP_PATH = Path("data/raw/taxi_zone_lookup.csv")
if not LOOKUP_PATH.exists():
    raise FileNotFoundError(f"Missing file: {LOOKUP_PATH}. Run the download step first.")

# Load lookup with Polars
zones_df = pl.read_csv(str(LOOKUP_PATH))

# DuckDB in-memory connection
con = duckdb.connect(database=":memory:")

# Register Polars dataframes via Arrow 
con.register("trips", df_features.to_arrow())
con.register("zones", zones_df.to_arrow())

print("Trips rows:", con.sql("SELECT COUNT(*) FROM trips").fetchone()[0])
print("Zones rows:", con.sql("SELECT COUNT(*) FROM zones").fetchone()[0])


Trips rows: 2870102
Zones rows: 265


### m) Top 10 busiest pickup zones
This query finds the pickup zones with the highest number of trips by joining trip pickup location IDs to the taxi zone lookup table and counting trips per zone.


In [8]:
con.sql("""
SELECT
  z."Zone" AS pickup_zone,
  z."Borough" AS pickup_borough,
  COUNT(*) AS trip_count
FROM trips t
JOIN zones z
  ON t."PULocationID" = z."LocationID"
GROUP BY 1, 2
ORDER BY trip_count DESC
LIMIT 10;
""").df()


Unnamed: 0,pickup_zone,pickup_borough,trip_count
0,Midtown Center,Manhattan,140161
1,Upper East Side South,Manhattan,140134
2,JFK Airport,Queens,138478
3,Upper East Side North,Manhattan,133975
4,Midtown East,Manhattan,104356
5,Times Sq/Theatre District,Manhattan,102972
6,Penn Station/Madison Sq West,Manhattan,102161
7,Lincoln Square East,Manhattan,101800
8,LaGuardia Airport,Queens,87715
9,Upper West Side South,Manhattan,86475


### n) Average fare amount by hour of day
This query computes the average fare amount for each pickup hour (0–23) and orders results by hour.


In [9]:
con.sql("""
SELECT
  t.pickup_hour,
  AVG(t.fare_amount) AS avg_fare_amount
FROM trips t
GROUP BY 1
ORDER BY 1;
""").df()


Unnamed: 0,pickup_hour,avg_fare_amount
0,0,19.67925
1,1,17.732032
2,2,16.621723
3,3,18.530033
4,4,23.435229
5,5,27.492713
6,6,22.026585
7,7,18.749879
8,8,17.822939
9,9,17.943989


### o) Percentage of trips by payment type
This query counts trips for each payment_type and converts counts into percentages of the total trip count.


In [11]:
con.sql("""
WITH counts AS (
  SELECT
    payment_type,
    COUNT(*) AS trips
  FROM trips
  GROUP BY 1
),
total AS (
  SELECT SUM(trips) AS total_trips FROM counts
)
SELECT
  CASE payment_type
    WHEN 1 THEN 'Credit Card'
    WHEN 2 THEN 'Cash'
    WHEN 3 THEN 'No Charge'
    WHEN 4 THEN 'Dispute'
    WHEN 5 THEN 'Unknown'
    WHEN 6 THEN 'Voided Trip'
    ELSE 'Other'
  END AS payment_type_name,
  trips,
  ROUND(100.0 * trips / total_trips, 2) AS percent_of_trips
FROM counts
CROSS JOIN total
ORDER BY percent_of_trips DESC;

""").df()


Unnamed: 0,payment_type_name,trips,percent_of_trips
0,Credit Card,2298412,80.08
1,Cash,422921,14.74
2,Other,115245,4.02
3,Dispute,22876,0.8
4,No Charge,10648,0.37


### p) Average tip percentage by day of week for credit card payments
This query filters to credit card payments only and computes average tip percentage (tip_amount / fare_amount) by day of week.


In [12]:
con.sql("""
SELECT
  pickup_day_of_week,
  ROUND(AVG(100.0 * tip_amount / fare_amount), 2) AS avg_tip_percent
FROM trips
WHERE payment_type = 1
  AND fare_amount > 0
GROUP BY 1
ORDER BY
  CASE pickup_day_of_week
    WHEN 'Monday' THEN 1
    WHEN 'Tuesday' THEN 2
    WHEN 'Wednesday' THEN 3
    WHEN 'Thursday' THEN 4
    WHEN 'Friday' THEN 5
    WHEN 'Saturday' THEN 6
    WHEN 'Sunday' THEN 7
    ELSE 8
  END;
""").df()


Unnamed: 0,pickup_day_of_week,avg_tip_percent
0,Monday,25.51
1,Tuesday,25.73
2,Wednesday,25.71
3,Thursday,29.73
4,Friday,25.6
5,Saturday,26.29
6,Sunday,25.1


### q) Top 5 pickup-dropoff zone pairs
This query joins the lookup table twice (pickup and dropoff) to find the most common origin-destination zone pairs by trip count.


In [17]:
con.sql("""
SELECT
  pu."Zone" AS pickup_zone,
  dz."Zone" AS dropoff_zone,
  COUNT(*) AS trip_count
FROM trips t
JOIN zones pu
  ON t."PULocationID" = pu."LocationID"
JOIN zones dz
  ON t."DOLocationID" = dz."LocationID"
GROUP BY 1, 2
ORDER BY trip_count DESC
LIMIT 5;
""").df()


Unnamed: 0,pickup_zone,dropoff_zone,trip_count
0,Upper East Side South,Upper East Side North,21642
1,Upper East Side North,Upper East Side South,19199
2,Upper East Side North,Upper East Side North,15200
3,Upper East Side South,Upper East Side South,14119
4,Midtown Center,Upper East Side South,10139


## Part 3: Dashboard Development
Prototype and test the visualizations here before implementing them in `app.py`.
