In [None]:
from pyspark.sql import functions as F, Window

az = spark.read.parquet("lake/gold/agg_zone_hour")

# ensure date column (pickup_date already exists)
data = az.select("service","pulocationid","hour","pickup_date","trips")

# define split
valid_start = "2024-12-01"
train = data.filter(F.col("pickup_date") < F.lit(valid_start))
valid = data.filter(F.col("pickup_date") >= F.lit(valid_start))

# join valid with train shifted by 7 days
valid_lag = (valid
    .join(train.withColumn("pickup_date", F.date_add("pickup_date", 7)),
          on=["service","pulocationid","hour","pickup_date"], how="left")
    .select(valid["*"], F.col("trips").alias("yhat"))
)

results = (valid_lag
    .withColumn("ae", F.abs(F.col("trips")-F.col("yhat")))
    .withColumn("ape", F.when(F.col("trips")>0, F.abs((F.col("trips")-F.col("yhat"))/F.col("trips"))))
)

metrics = results.agg(F.avg("ae").alias("MAE"), F.avg("ape").alias("MAPE")).toPandas()
metrics


In [None]:
train_stats = (train
    .withColumn("dow", F.dayofweek("pickup_date"))  # 1=Sun..7=Sat
    .groupBy("service","pulocationid","hour","dow")
    .agg(F.avg("trips").alias("seasonal_mean")))

valid_feat = valid.withColumn("dow", F.dayofweek("pickup_date"))

pred = (valid_feat.join(train_stats,
        on=["service","pulocationid","hour","dow"], how="left")
        .withColumn("yhat", F.col("seasonal_mean")))

res = (pred
    .withColumn("ae", F.abs(F.col("trips")-F.col("yhat")))
    .withColumn("ape", F.when(F.col("trips")>0, F.abs((F.col("trips")-F.col("yhat"))/F.col("trips"))))
)

metrics_B = res.agg(F.avg("ae").alias("MAE"), F.avg("ape").alias("MAPE")).toPandas()
metrics_B