In [0]:
# ⭐ Silver → Gold – NYC Yellow Taxi (Analytics-Ready)

This notebook:
- Reads **Silver** Delta
- Applies **data quality checks**, **standardizes types & names**
- Creates derived features for BI (e.g., `trip_duration_minutes`, `tip_percentage`)
- Builds a **star schema** with **dimensions** and a **fact** table
- Saves & registers all as **Delta tables** in `gold` schema

## Import Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import datediff, expr, to_date, hour, when, round
from pyspark.sql.functions import lit, trim, lower, initcap, when, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql.functions import dense_rank
import logging

## Configurations

#### Logger

In [0]:
import logging
logger = logging.getLogger("SilverToGold-Analytics")
if not logger.hasHandlers():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger.info("Logger initialized.")

## Constants and Parameters

In [0]:
# I/O paths (ADLS Gen2 via abfss)
silver_path = "abfss://silver@assportfolio.dfs.core.windows.net/nyc/taxi/yellow/"
gold_base   = "abfss://gold@assportfolio.dfs.core.windows.net/nyc/taxi/yellow/"

# Gold table locations
gold_dim_vendor_path       = f"{gold_base}dim_vendor/"
gold_dim_payment_type_path = f"{gold_base}dim_payment_type/"
gold_dim_rate_code_path    = f"{gold_base}dim_rate_code/"
gold_fact_trips_path       = f"{gold_base}fact_yellow_trips/"

# Metastore tables
gold_schema = "gold"
tbl_dim_vendor       = f"{gold_schema}.dim_vendor"
tbl_dim_payment_type = f"{gold_schema}.dim_payment_type"
tbl_dim_rate_code    = f"{gold_schema}.dim_rate_code"
tbl_fact_trips       = f"{gold_schema}.fact_yellow_trips"

## Main Process

#### Read Silver Data

In [0]:
df_silver = spark.read.format("delta").load(silver_path)
logger.info(f"Loaded Silver rows: {df_silver.count()}")
df_silver.printSchema()


#### Minor Transformations

##### Data Quality Gates (lightweight)

In [0]:
# Required columns expected from Silver step
required_cols = [
    "vendor_id","pickup_datetime","dropoff_datetime","passenger_count",
    "trip_distance","ratecodeid","store_and_fwd_flag","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount",
    "improvement_surcharge","total_amount","congestion_surcharge"
]

missing = [c for c in required_cols if c not in df_silver.columns]
if missing:
    raise ValueError(f"Missing required columns in Silver: {missing}")

# Basic sanity filters (e.g., non-negative fares/distance, valid times)
df_sq = (
    df_silver
    .where(col("pickup_datetime").isNotNull() & col("dropoff_datetime").isNotNull())
    .where(col("trip_distance").cast("double") >= 0.0)
    .where(col("total_amount").cast("double").isNotNull())
)
logger.info(f"Rows after sanity filters: {df_sq.count()}")


##### Standardize Schema & Types

In [0]:
df_std = (
    df_sq
    # timestamps
    .withColumn("pickup_datetime",  to_timestamp(col("pickup_datetime")))
    .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime")))
    # numerics
    .withColumn("passenger_count",       col("passenger_count").cast("int"))
    .withColumn("trip_distance",         col("trip_distance").cast("double"))
    .withColumn("ratecodeid",            col("ratecodeid").cast("int"))
    .withColumn("payment_type",          col("payment_type").cast("int"))
    .withColumn("fare_amount",           col("fare_amount").cast("double"))
    .withColumn("extra",                 col("extra").cast("double"))
    .withColumn("mta_tax",               col("mta_tax").cast("double"))
    .withColumn("tip_amount",            col("tip_amount").cast("double"))
    .withColumn("tolls_amount",          col("tolls_amount").cast("double"))
    .withColumn("improvement_surcharge", col("improvement_surcharge").cast("double"))
    .withColumn("total_amount",          col("total_amount").cast("double"))
    .withColumn("congestion_surcharge",  col("congestion_surcharge").cast("double"))
)
logger.info("Schema standardized.")

##### Derive Analytics Features

In [0]:
df_feat = (
    df_std
    .withColumn("trip_duration_minutes", (expr("unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)"))/60.0)
    .withColumn("trip_date", to_date(col("pickup_datetime")))
    .withColumn("pickup_hour", hour(col("pickup_datetime")))
    .withColumn("tip_percentage",
        when(col("fare_amount") > 0, (col("tip_amount")/col("fare_amount"))*100.0).otherwise(None)
    )
    .withColumn("had_congestion_surcharge", (col("congestion_surcharge") > 0).cast("boolean"))
    .withColumn("store_and_fwd_flag", when(col("store_and_fwd_flag")=="Y", "Y").otherwise("N"))
)

# Optional: cap absurd durations/distances for clean analytics
df_feat = df_feat.where((col("trip_duration_minutes") >= 0) & (col("trip_duration_minutes") <= 720))  # <=12h
df_feat = df_feat.where(col("trip_distance") <= 200)  # cap 200 miles

logger.info(f"Rows after feature derivation/filters: {df_feat.count()}")

##### Build Dimensions (conformed, small, slow-changing)

In [0]:
# Dim: Vendor
dim_vendor_src = (
    df_feat.select("vendor_id")
    .where(col("vendor_id").isNotNull())
    .dropDuplicates()
)
dim_vendor = dim_vendor_src.withColumn("vendor_key", dense_rank().over(Window.orderBy(col("vendor_id"))))
dim_vendor = dim_vendor.select("vendor_key","vendor_id")

# Dim: Payment Type (map to friendly names when possible)
payment_name = when(col("payment_type")==1,"Credit Card") \
    .when(col("payment_type")==2,"Cash") \
    .when(col("payment_type")==3,"No Charge") \
    .when(col("payment_type")==4,"Dispute") \
    .when(col("payment_type")==5,"Unknown") \
    .when(col("payment_type")==6,"Voided Trip") \
    .otherwise("Other")

dim_payment_type_src = (
    df_feat.select("payment_type")
    .where(col("payment_type").isNotNull())
    .dropDuplicates()
)
dim_payment_type = dim_payment_type_src.withColumn("payment_type_key", dense_rank().over(Window.orderBy(col("payment_type")))) \
                                       .withColumn("payment_type_name", payment_name) \
                                       .select("payment_type_key","payment_type","payment_type_name")

# Dim: Rate Code (friendly names)
rate_name = when(col("ratecodeid")==1,"Standard rate") \
    .when(col("ratecodeid")==2,"JFK") \
    .when(col("ratecodeid")==3,"Newark") \
    .when(col("ratecodeid")==4,"Nassau/Westchester") \
    .when(col("ratecodeid")==5,"Negotiated fare") \
    .when(col("ratecodeid")==6,"Group ride") \
    .otherwise("Other")

dim_rate_code_src = (
    df_feat.select("ratecodeid")
    .where(col("ratecodeid").isNotNull())
    .dropDuplicates()
)
dim_rate_code = dim_rate_code_src.withColumn("rate_code_key", dense_rank().over(Window.orderBy(col("ratecodeid")))) \
                                 .withColumn("rate_code_name", rate_name) \
                                 .select("rate_code_key","ratecodeid","rate_code_name")

logger.info("Dimensions built (vendor, payment_type, rate_code).")

##### Build Fact Table

In [0]:
# Join dims to create surrogate keys
fact = (
    df_feat
    .join(dim_vendor,       on="vendor_id",   how="left")
    .join(dim_payment_type, on="payment_type",how="left")
    .join(dim_rate_code,    on="ratecodeid",  how="left")
)

# Select curated fact columns (narrow, analytics-friendly)
fact_trips = fact.select(
    # surrogate keys
    "vendor_key","payment_type_key","rate_code_key",
    # degenerate/natural keys (none specific here; include if available)
    # time grain
    "trip_date","pickup_datetime","dropoff_datetime","pickup_hour",
    # measures
    "passenger_count","trip_distance","fare_amount","extra","mta_tax","tip_amount",
    "tolls_amount","improvement_surcharge","congestion_surcharge","total_amount",
    "trip_duration_minutes","tip_percentage","had_congestion_surcharge",
    # flags
    "store_and_fwd_flag"
)

logger.info(f"Fact row count: {fact_trips.count()}")

#### Write Dimensions & Fact as Delta (Partition fact by trip_date)

In [0]:
# Write dims (overwrite for demo; switch to merge/upsert for incremental)
(dim_vendor
 .write.format("delta").mode("overwrite").option("overwriteSchema","true").save(gold_dim_vendor_path))

(dim_payment_type
 .write.format("delta").mode("overwrite").option("overwriteSchema","true").save(gold_dim_payment_type_path))

(dim_rate_code
 .write.format("delta").mode("overwrite").option("overwriteSchema","true").save(gold_dim_rate_code_path))

# Fact (partitioned by date for analytics performance)
(fact_trips
 .write.format("delta").mode("overwrite").option("overwriteSchema","true")
 .partitionBy("trip_date")
 .save(gold_fact_trips_path))

logger.info("Delta written to Gold storage locations.")

#### Register Delta Table in Metastore

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
logger.info("Schema `gold` created or already exists.")

spark.sql(f"CREATE TABLE IF NOT EXISTS {tbl_dim_vendor}       USING DELTA LOCATION '{gold_dim_vendor_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {tbl_dim_payment_type} USING DELTA LOCATION '{gold_dim_payment_type_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {tbl_dim_rate_code}    USING DELTA LOCATION '{gold_dim_rate_code_path}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS {tbl_fact_trips}       USING DELTA LOCATION '{gold_fact_trips_path}'")

logger.info(f"Registered: {tbl_dim_vendor}, {tbl_dim_payment_type}, {tbl_dim_rate_code}, {tbl_fact_trips}")