In [0]:
ROOT   = "/Volumes/tabular/dataexpert/benchmarking_capstone"
SILVER = f"{ROOT}/silver"
GOLD   = f"{ROOT}/gold"

TX_SILVER_PATH   = f"{SILVER}/transactions_clean"
CUST_SILVER_PATH = f"{SILVER}/customers_clean"

KPIS_GOLD_PATH   = f"{GOLD}/store_day_kpis"
BENCH_LOG_PATH   = f"{GOLD}/benchmark_results_v2"
SUMMARY=f"{GOLD}/summary"


In [0]:
# One source of truth for windows
DATASETS = {
    "small_9m":   {"from": "2024-03-04", "to": "2024-03-10"},
    "large_142m": {"from": "2024-03-04", "to": "2024-06-11"},
    "xl_250m":    {"from": "2024-03-04", "to": "2024-08-26"},
}

dbutils.widgets.removeAll()
dbutils.widgets.dropdown("dataset_label", "small_9m", list(DATASETS.keys()))
dbutils.widgets.text("cluster_profile", "dev_small")
dbutils.widgets.text("hourly_rate_usd", "0.60")   # set per cluster
dbutils.widgets.text("notes", "")                 # optional free-form (e.g., "after OPTIMIZE")

label            = dbutils.widgets.get("dataset_label")
DATE_FROM        = DATASETS[label]["from"]
DATE_TO          = DATASETS[label]["to"]
cluster_profile  = dbutils.widgets.get("cluster_profile").strip()
hourly_rate      = float(dbutils.widgets.get("hourly_rate_usd"))
notes            = dbutils.widgets.get("notes").strip()

print(f"Selected: {label} -> [{DATE_FROM} , {DATE_TO}) on cluster '{cluster_profile}'")


Selected: xl_250m -> [2024-03-04 , 2024-08-26) on cluster 'dev_large'


In [0]:
from pyspark.sql import functions as F, Window as W

tx   = spark.read.format("delta").load(TX_SILVER_PATH) \
         .filter((F.col("ingest_day") >= DATE_FROM) & (F.col("ingest_day") < DATE_TO))
cust = spark.read.format("delta").load(CUST_SILVER_PATH)

rows_in = tx.count()
print(f"Rows in slice ({label}): {rows_in:,}")

# Small dim → broadcast is safe & faster
cust = cust.cache()
j = (tx.hint("broadcast")
       .join(cust, "customer_id", "left")
       .withColumn("day", F.to_date("ts")))


Rows in slice (xl_250m): 256,500,000


In [0]:
# ==== Auto-detect runtime flags & cluster facts (no manual typing) ====
def _get_bool(
    name: str,
    default="false"
) -> bool:
    try:
        return spark.conf.get(name, default).lower() in ("true", "1", "yes")
    except Exception:
        return default.lower() in ("true", "1", "yes")

def _get_str(
    name: str,
    default: str = ""
) -> str:
    try:
        return spark.conf.get(name, default)
    except Exception:
        return default

# Spark/Databricks flags
aqe_enabled    = _get_bool("spark.sql.adaptive.enabled", "true")
photon_enabled = _get_bool("spark.databricks.photon.enabled", "true")
shuffle_parts  = int(_get_str("spark.sql.shuffle.partitions", "1600"))

# Helpful cluster context (nice for your logs)
cluster_name   = spark.conf.get("spark.databricks.clusterUsageTags.clusterName", "unknown")
dbr_version    = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "unknown")
node_type      = spark.conf.get("spark.databricks.clusterUsageTags.node_type_id", "unknown")
workers_hint   = spark.conf.get("spark.databricks.clusterUsageTags.clusterWorkers", "unknown")
total_cores    = spark.conf.get("spark.databricks.defaultParallelism", "unknown")  # May not be exact

print(
    f"[ENV] cluster={cluster_name} dbr={dbr_version} node={node_type} workers~={workers_hint} "
    f"| photon={photon_enabled} aqe={aqe_enabled} shuffle_parts={shuffle_parts} cores~={total_cores}"
)

[ENV] cluster=dev_large dbr=15.4.x-photon-scala2.12 node=unknown workers~=8 | photon=True aqe=True shuffle_parts=1600 cores~=unknown


In [0]:
import time
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, BooleanType

t0 = time.time()

# Core KPIs by store/day (extend as needed)
by_store_day = (j.groupBy("store_id", "day")
                  .agg(F.count("*").alias("txn_cnt"),
                       F.sum("amount").alias("revenue"),
                       F.avg("amount").alias("avg_basket"),
                       F.expr("percentile_approx(amount, 0.95)").alias("p95_amount")))

# 7-day moving window
w = W.partitionBy("store_id").orderBy("day").rowsBetween(-6, 0)
kpis = (by_store_day
        .withColumn("rev_mv7", F.avg("revenue").over(w))
        .withColumn("dataset_label", F.lit(label))
        .withColumn("cluster_profile", F.lit(cluster_profile))
        .withColumn("run_ts", F.lit(datetime.utcnow().isoformat())))

# Write Gold for just this dataset_label
(kpis.write
   .format("delta")
   .mode("overwrite")
   .option("replaceWhere", f"dataset_label = '{label}'")
   .save(KPIS_GOLD_PATH))

duration_sec = time.time() - t0
cost_usd = (duration_sec / 3600.0) * hourly_rate

print(f"Gold duration: {round(duration_sec,2)}s  |  Estimated cost: ${cost_usd:.4f}")

# ---- Append a structured log row ----
schema = StructType([
    StructField("run_ts",          StringType(),  True),
    StructField("cluster_profile", StringType(),  True),
    StructField("dataset_label",   StringType(),  True),
    StructField("rows_in",         LongType(),    True),
    StructField("duration_sec",    DoubleType(),  True),
    StructField("hourly_rate_usd", DoubleType(),  True),
    StructField("cost_usd",        DoubleType(),  True),
    StructField("photon_enabled",  BooleanType(), True),
    StructField("aqe_enabled",     BooleanType(), True),
    StructField("shuffle_parts",   IntegerType(), True),
    StructField("date_from",       StringType(),  True),
    StructField("date_to",         StringType(),  True),
    StructField("notes",           StringType(),  True),
])

row = [(datetime.utcnow().isoformat(), cluster_profile, label, int(rows_in),
        float(round(duration_sec,2)), float(hourly_rate), float(round(cost_usd,4)),
        bool(photon_enabled), bool(aqe_enabled), int(shuffle_parts),
        DATE_FROM, DATE_TO, notes)]

spark.createDataFrame(row, schema=schema) \
     .write.mode("append").format("delta").save(BENCH_LOG_PATH)

print("Logged →", BENCH_LOG_PATH)


Gold duration: 6.83s  |  Estimated cost: $0.0046
Logged → /Volumes/tabular/dataexpert/benchmarking_capstone/gold/benchmark_results_v2


In [0]:
display(spark.read.format("delta").load(BENCH_LOG_PATH).orderBy(F.desc("run_ts")).limit(20))

# display(spark.read.format("delta").load(KPIS_GOLD_PATH)
#         .filter(F.col("dataset_label")==label)
#         .orderBy("store_id","day")
#         .limit(50))


run_ts,cluster_profile,dataset_label,rows_in,duration_sec,hourly_rate_usd,cost_usd,photon_enabled,aqe_enabled,shuffle_parts,date_from,date_to,notes
2025-08-18T03:43:15.910055,dev_large,xl_250m,256500000,6.83,2.4,0.0046,True,True,1600,2024-03-04,2024-08-26,
2025-08-18T03:41:20.775420,dev_large,xl_250m,256500000,11.27,2.4,0.0075,True,True,1600,2024-03-04,2024-08-26,
2025-08-18T03:39:51.413381,dev_large,large_142m,142500000,5.82,2.4,0.0039,True,True,1600,2024-03-04,2024-06-11,
2025-08-18T03:39:29.308986,dev_large,large_142m,142500000,8.36,2.4,0.0056,True,True,1600,2024-03-04,2024-06-11,
2025-08-18T03:39:07.977745,dev_large,large_142m,142500000,28.04,2.4,0.0187,True,True,1600,2024-03-04,2024-06-11,
2025-08-18T03:38:17.050987,dev_large,small_9m,9000000,4.11,2.4,0.0027,True,True,1600,2024-03-04,2024-03-10,
2025-08-18T03:37:44.989558,dev_large,small_9m,9000000,17.87,2.4,0.0119,True,True,1600,2024-03-04,2024-03-10,
2025-08-18T03:36:33.790120,dev_large,small_9m,9000000,8.53,2.4,0.0057,True,True,1600,2024-03-04,2024-03-10,
2025-08-18T03:36:03.057925,dev_large,small_9m,9000000,69.25,2.4,0.0462,True,True,1600,2024-03-04,2024-03-10,
2025-08-18T03:25:38.990599,dev_large,xl_250m,256500000,23.55,2.4,0.0157,False,True,1600,2024-03-04,2024-08-26,


In [0]:
from pyspark.sql import functions as F, Window as W

bench = spark.read.format("delta").load(BENCH_LOG_PATH)

summary_med = (bench
  .groupBy("cluster_profile","dataset_label","photon_enabled","aqe_enabled","shuffle_parts")
  .agg(
      F.count("*").alias("runs"),
      F.expr("percentile_approx(duration_sec, 0.5)").alias("median_sec"),
      F.avg("duration_sec").alias("avg_sec"),
      F.expr("percentile_approx(cost_usd, 0.5)").alias("median_cost_usd"),
      (F.expr("percentile_approx(cost_usd, 0.5)") / (F.avg("rows_in")/1e6)).alias("median_cost_per_million")
  )
  .orderBy("cluster_profile","dataset_label","photon_enabled"))

display(summary_med)


cluster_profile,dataset_label,photon_enabled,aqe_enabled,shuffle_parts,runs,median_sec,avg_sec,median_cost_usd,median_cost_per_million
dev_large,large_142m,False,True,1600,2,17.16,24.26,0.0114,8e-05
dev_large,large_142m,True,True,1600,3,8.36,14.073333333333332,0.0056,3.929824561403509e-05
dev_large,small_9m,False,True,1600,3,19.45,33.230000000000004,0.013,0.0014444444444444
dev_large,small_9m,True,True,1600,4,8.53,24.94,0.0057,0.0006333333333333334
dev_large,xl_250m,False,True,1600,2,23.55,25.35,0.0157,6.120857699805067e-05
dev_large,xl_250m,True,True,1600,2,6.83,9.05,0.0046,1.7933723196881093e-05
dev_medium,large_142m,False,True,1600,2,27.54,30.725,0.0092,6.456140350877194e-05
dev_medium,large_142m,True,True,1600,3,9.18,11.31,0.0031,2.175438596491228e-05
dev_medium,small_9m,False,True,1600,2,12.52,36.58,0.0042,0.00046666666666666666
dev_medium,small_9m,True,True,1600,2,9.86,30.625,0.0033,0.0003666666666666666


In [0]:
summary_med.write.mode("overwrite").format("delta").save(SUMMARY)