
# DAMO630 — Assignment 1 (Part B Only: NYC Taxi with PySpark, FPGrowth Fixed)

**Generated:** 2025-10-04 02:38  

This standalone notebook contains only **Part B** with a robust FPGrowth setup:
- B1: Download & Load TLC Yellow Taxi data (Jan 2024)
- B2: MapReduce-equivalent in Spark (total fare per pickup zone)
- B3: Frequent pattern mining with FPGrowth (✅ items as Array[String])
- B4: Rider segmentation with KMeans
- B5: Business interpretation


In [None]:

# Install PySpark and helpers (run once per session)
%pip install --quiet pyspark pandas numpy


## B1. Download & Load TLC Yellow Trip Data (Jan 2024)

In [None]:

import os
parquet_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
local_path = "yellow_tripdata_2024-01.parquet"
if not os.path.exists(local_path):
    !wget -q {parquet_url} -O {local_path}
print("File ready:", local_path, os.path.getsize(local_path), "bytes")

from pyspark.sql import SparkSession, functions as F, types as T
spark = SparkSession.builder.appName("NYC-Taxi-PartB-Only").getOrCreate()
df_taxi = spark.read.parquet(local_path).cache()
print("Rows:", df_taxi.count())
df_taxi.printSchema()
df_taxi.show(5, truncate=False)


## B2. MapReduce-equivalent in Spark: Total Fare per Pickup Zone

In [None]:

# Select & basic cleaning
cols_needed = ["tpep_pickup_datetime","PULocationID","DOLocationID","trip_distance","fare_amount","passenger_count"]
df_clean = df_taxi.select(*[c for c in cols_needed if c in df_taxi.columns])
df_clean = df_clean.where(
    (F.col("fare_amount") >= 0) &
    (F.col("trip_distance") >= 0) &
    (F.col("passenger_count") >= 0)
)

# Aggregate total fare by pickup zone
fare_by_pu = (df_clean.groupBy("PULocationID")
                      .agg(F.sum("fare_amount").alias("total_fare"))
                      .orderBy(F.desc("total_fare")))
fare_by_pu.show(10)

# Save to CSV
out_dir = "out_fare_by_pu"
fare_by_pu.coalesce(1).write.mode("overwrite").option("header", True).csv(out_dir)
print("Saved:", out_dir)


## B3. Frequent Travel Patterns with FPGrowth (Fixed Array[String])

In [None]:

from pyspark.ml.fpm import FPGrowth

# Build explicit Array[String] 'items' column
df_basket = (df_clean
    .withColumn("hour", F.hour("tpep_pickup_datetime"))
    .withColumn("time_bucket",
                F.when(F.col("hour").between(6,10), F.lit("morning"))
                 .when(F.col("hour").between(16,19), F.lit("evening"))
                 .otherwise(F.lit("other")))
    .withColumn("pickup_zone", F.col("PULocationID").cast("string"))
    .withColumn("dropoff_zone", F.col("DOLocationID").cast("string"))
    .select("pickup_zone", "dropoff_zone", "time_bucket")
    .na.drop()
)

df_basket = df_basket.withColumn("items", F.array("pickup_zone","dropoff_zone","time_bucket")).select("items")

# Optional: sample for speed (uncomment to speed up)
# df_basket = df_basket.sample(False, 0.25, seed=42)

df_basket.printSchema()
df_basket.show(5, truncate=False)

fp = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.1)
fp_model = fp.fit(df_basket)

freq_items = fp_model.freqItemsets.orderBy(F.desc("freq"))
rules = fp_model.associationRules.orderBy(F.desc("lift"))

print("Top frequent itemsets:")
freq_items.show(10, truncate=False)

print("Top association rules:")
rules.select("antecedent","consequent","confidence","lift","support").show(10, truncate=False)

# Save outputs
freq_items.coalesce(1).write.mode("overwrite").option("header", True).csv("out_fp_itemsets")
rules.coalesce(1).write.mode("overwrite").option("header", True).csv("out_fp_rules")
print("Saved: out_fp_itemsets, out_fp_rules")


## B4. Rider Segmentation with KMeans

In [None]:

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

# Feature engineering
df_seg = (df_clean
          .withColumn("hour", F.hour("tpep_pickup_datetime"))
          .select("trip_distance","fare_amount","hour")
          .na.drop())
df_seg = df_seg.where((F.col("trip_distance") > 0) & (F.col("fare_amount") >= 0))

vec = VectorAssembler(inputCols=["trip_distance","fare_amount","hour"], outputCol="features_raw")
df_vec = vec.transform(df_seg)

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
df_scaled = scaler.fit(df_vec).transform(df_vec)

kmeans = KMeans(k=4, seed=42, featuresCol="features")
km_model = kmeans.fit(df_scaled)
preds = km_model.transform(df_scaled).cache()

print("Cluster centers (standardized):")
for i, c in enumerate(km_model.clusterCenters()):
    print(f"Cluster {i}: {c}")

summary = preds.groupBy("prediction").agg(
    F.count("*").alias("n"),
    F.avg("trip_distance").alias("avg_distance"),
    F.avg("fare_amount").alias("avg_fare"),
    F.avg("hour").alias("avg_hour")
).orderBy("prediction")
summary.show()

summary.coalesce(1).write.mode("overwrite").option("header", True).csv("out_kmeans_summary")
print("Saved: out_kmeans_summary")



## B5. Business Interpretation (for your write-up)

- **Fare by pickup zone** pinpoints revenue-dense areas (e.g., airports, midtown) — guide driver positioning and supply allocation.  
- **Frequent patterns** (PU→DO with time buckets) support flat-fare windows, pre-positioning, and targeted promos.  
- **KMeans segments** (short-hop commuters vs. long-distance/airport vs. late-night) inform fleet mix, pricing caps, and safety features.  
- **Operationalization**: refresh nightly; track drift/seasonality and adjust strategies accordingly.
