## Problem Statement
The operations team wants answers to these questions:
    1. How much revenue is generated by trips for 1, 2, 3, and 4+ passengers?
    2. What is the average tip amount and maximum tip for each passenger group?
    3. How many trips occur per passenger group?

The dataset is large (hundreds of thousands of rows or more). Traditional row-wise processing is too slow, so parallel computation is needed to produce insights quickly.

In [2]:
import polars as pl
from concurrent.futures import ThreadPoolExecutor
import time

# ── 1. Load data ──────────────────────────────────────────────────────────────
df = pl.read_csv(
    "yellow_tripdata.csv",
    try_parse_dates=True,
    null_values=["", "NA"]
)

print(f"Loaded {df.shape[0]:,} rows × {df.shape[1]} columns")
df.head(3)

Loaded 7,667,792 rows × 18 columns


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
i64,datetime[μs],datetime[μs],i64,f64,i64,str,i64,i64,i64,f64,f64,f64,f64,f64,f64,f64,str
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,"""N""",151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,"""N""",239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,"""N""",236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,


In [3]:
# ── 2. Define passenger groups ────────────────────────────────────────────────
# Each group is a (label, filter_expression) pair
GROUPS = {
    "1 passenger":  pl.col("passenger_count") == 1,
    "2 passengers": pl.col("passenger_count") == 2,
    "3 passengers": pl.col("passenger_count") == 3,
    "4+ passengers": pl.col("passenger_count") >= 4,
}

# ── 3. Define the per-group analysis function ─────────────────────────────────
def analyze_group(label: str, mask: pl.Expr) -> dict:
    """Compute revenue, tip, and trip-count metrics for one passenger group."""
    subset = df.filter(mask)
    
    metrics = subset.select([
        pl.len().alias("trip_count"),
        pl.col("total_amount").sum().round(2).alias("total_revenue"),
        pl.col("tip_amount").mean().round(4).alias("avg_tip"),
        pl.col("tip_amount").max().round(2).alias("max_tip"),
    ]).to_dicts()[0]          # single-row result → plain dict

    return {"group": label, **metrics}

# ── 4. Run analyses in parallel with ThreadPoolExecutor ───────────────────────
start = time.perf_counter()

with ThreadPoolExecutor(max_workers=len(GROUPS)) as pool:
    futures = [
        pool.submit(analyze_group, label, mask)
        for label, mask in GROUPS.items()
    ]
    results = [f.result() for f in futures]

elapsed = time.perf_counter() - start
print(f"Parallel analysis completed in {elapsed:.3f}s\n")

# ── 5. Display results ────────────────────────────────────────────────────────
summary = (
    pl.DataFrame(results)
    .sort("group")
)

summary

Parallel analysis completed in 1.077s



group,trip_count,total_revenue,avg_tip,max_tip
str,i64,f64,f64,f64
"""1 passenger""",5456121,85168000.0,1.8283,787.25
"""2 passengers""",1114106,17638000.0,1.8339,300.0
"""3 passengers""",314721,4910900.0,1.7956,150.0
"""4+ passengers""",665463,10341000.0,1.8309,444.8
