In [1]:
%pip install pandas requests

StatementMeta(, 8c0784f0-3b1a-43df-8c40-71e85a58cb0e, 7, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [7]:
# Fetch from open-meteo and write it in a delta table
# Retries + chunked pulls for Open-Meteo in Fabric

import pandas as pd, requests, time
from urllib.parse import urlencode
from datetime import datetime, date, timedelta
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

LAT, LON = 42.314, -71.038
START_DATE, END_DATE = "2025-07-01", "2025-08-31"   # edit freely
TIMEZONE = "America/New_York"
CHUNK_DAYS = 14                                     # pull 2 weeks at a time

# robust session with retries/backoff (covers timeouts & 5xx/429)
retry = Retry(
    total=6, connect=3, read=6,
    backoff_factor=1.5,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["GET"]
)
session = requests.Session()
session.mount("https://", HTTPAdapter(max_retries=retry))

def fetch_chunk(start_str, end_str):
    base = "https://archive-api.open-meteo.com/v1/archive"  # more stable endpoint
    params = {
        "latitude": LAT, "longitude": LON,
        "start_date": start_str, "end_date": end_str,
        "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
        "timezone": TIMEZONE
    }
    url = f"{base}?{urlencode(params)}"
    r = session.get(url, timeout=(10, 180))  # longer read timeout
    r.raise_for_status()
    j = r.json().get("daily", {})
    return pd.DataFrame({
        "date": j.get("time", []),
        "tmax_c": j.get("temperature_2m_max", []),
        "tmin_c": j.get("temperature_2m_min", []),
        "precip_mm": j.get("precipitation_sum", []),
    })

# chunk the period
s = datetime.fromisoformat(START_DATE).date()
e = datetime.fromisoformat(END_DATE).date()
dfs = []
cur = s
while cur <= e:
    chunk_end = min(cur + timedelta(days=CHUNK_DAYS-1), e)
    dfc = fetch_chunk(cur.isoformat(), chunk_end.isoformat())
    dfs.append(dfc)
    time.sleep(0.3)  # be polite
    cur = chunk_end + timedelta(days=1)

pdf = pd.concat(dfs, ignore_index=True)
pdf["is_rain"] = (pdf["precip_mm"].fillna(0) > 0).astype("int64")
pdf["date"] = pd.to_datetime(pdf["date"])

sdf = spark.createDataFrame(pdf)
(sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_weather_raw"))

print("✅ bronze_weather_raw written:", spark.table("bronze_weather_raw").count(), "rows")


StatementMeta(, 8c0784f0-3b1a-43df-8c40-71e85a58cb0e, 17, Finished, Available, Finished)

✅ bronze_weather_raw written: 62 rows


In [2]:
%pip install icalendar pandas

import glob, pandas as pd, numpy as np, re
from icalendar import Calendar
from datetime import datetime
from dateutil.tz import gettz
import pyspark.sql.functions as F

TZ = gettz("America/New_York")

# 1) align to weather date range (so joins are clean)
mm = spark.table("bronze_weather_raw") \
    .select(F.min("date").alias("start"), F.max("date").alias("end")) \
    .collect()[0]
start, end = pd.to_datetime(mm["start"]).date(), pd.to_datetime(mm["end"]).date()

# 2) helper: home/away heuristics for Sidearm Sports feeds
def is_home_game(summary: str, location: str) -> bool:
    s = (summary or "").lower()
    loc = (location or "").lower()
    # "vs" typically = home, "at" = away; location hints are a backup
    home_kw = [" vs ", "vs ", "v. "]  # include common variants
    campus_kw = ["umass boston", "clark athletic", "bc high", "boston, mass", "beacons"]
    return any(k in s for k in home_kw) or any(k in loc for k in campus_kw)

# 3) parse all ICS files
rows = []
for path in glob.glob("/lakehouse/default/Files/calendars/*.ics"):
    with open(path, "rb") as f:
        cal = Calendar.from_ical(f.read())
    for ev in cal.walk("VEVENT"):
        dt = ev.get("dtstart").dt
        # normalize to local date
        d = dt.astimezone(TZ).date() if isinstance(dt, datetime) else dt
        if d is None:
            continue
        if not (start <= d <= end):
            continue
        summary = str(ev.get("summary", "") or "")
        location = str(ev.get("location", "") or "")
        home = int(is_home_game(summary, location))
        sport_guess = re.sub(r"[^A-Za-z ]","", summary).strip().split(" ")[-1] if summary else ""
        rows.append({
            "date": d,
            "event_name": summary,
            "location": location,
            "is_home": home,
            "source_file": path.split("/")[-1],
            "sport_guess": sport_guess
        })

events = pd.DataFrame(rows)

# 4) load scoring (tuneable)
def load_score(name: str, is_home: int) -> int:
    n = (name or "").lower()
    # future: union registrar "orientation", "homecoming", etc. as score 2
    if any(k in n for k in ["championship", "tournament final", "homecoming"]):
        return 2
    if any(k in n for k in ["game", "match", "tournament", "vs "]) and is_home:
        return 1
    return 0

if not events.empty:
    events["event_load_score"] = [load_score(n, h) for n, h in zip(events["event_name"], events["is_home"])]
    # day-level rollup across all sports
    events_day = (
        events.groupby("date", as_index=False)
              .agg(
                  event_count_all=("event_name", "count"),
                  event_count_home=("is_home", "sum"),
                  event_load_score=("event_load_score", "max")
              )
    )
    events_day["major_event_flag"] = (events_day["event_load_score"] >= 2).astype(int)
else:
    events_day = pd.DataFrame(columns=["date","event_count_all","event_count_home","event_load_score","major_event_flag"])

# 5) write to lakehouse (flat table name = no schema creation)
sdf = spark.createDataFrame(events_day.assign(date=pd.to_datetime(events_day["date"])))
(sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_events_raw"))

print("✅ bronze_events_raw rows:", spark.table("bronze_events_raw").count())
spark.table("bronze_events_raw").orderBy("date").limit(10).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 15, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
✅ bronze_events_raw rows: 2
+-------------------+---------------+----------------+----------------+----------------+
|date               |event_count_all|event_count_home|event_load_score|major_event_flag|
+-------------------+---------------+----------------+----------------+----------------+
|2025-08-29 00:00:00|3              |2               |1               |0               |
|2025-08-30 00:00:00|5              |2               |1               |0               |
+-------------------+---------------+----------------+----------------+----------------+




In [5]:
import pandas as pd
import pyspark.sql.functions as F
from datetime import timedelta

mm = spark.table("bronze_weather_raw").select(F.min("date").alias("start"), F.max("date").alias("end")).collect()[0]
start, end = pd.to_datetime(mm["start"]).date(), pd.to_datetime(mm["end"]).date()

BLOCKS = [
    ("MWF","09:00-09:50"),("MWF","10:00-10:50"),("MWF","11:00-11:50"),("MWF","12:00-12:50"),
    ("TTh","11:00-12:15"),("TTh","12:30-13:45"),("TTh","14:00-15:15"),
    ("MW","13:00-14:15"), ("MW","14:30-15:45"),
]
LUNCH_WIN=("11:30","14:30"); DINNER_WIN=("17:00","19:30")

def in_window(t, win): s,e=t.split("-"); return not (e < win[0] or s > win[1])
def runs(dow, days): return ((dow in [0,2,4] and days=="MWF") or (dow in [1,3] and days=="TTh") or (dow in [0,2] and days=="MW"))

rows=[]; d=start
while d<=end:
    dow=d.weekday()
    todays=[b for b in BLOCKS if runs(dow,b[0])]
    rows.append({
        "date": d,
        "block_count": len(todays),
        "lunch_window_blocks": sum(1 for _,t in todays if in_window(t,LUNCH_WIN)),
        "dinner_window_blocks": sum(1 for _,t in todays if in_window(t,DINNER_WIN))
    })
    d+=timedelta(days=1)

pdf = pd.DataFrame(rows)
spark.createDataFrame(pdf.assign(date=pd.to_datetime(pdf["date"]))) \
     .write.mode("overwrite").format("delta").saveAsTable("bronze_classblocks_raw")

print("✅ bronze_classblocks_raw", spark.table("bronze_classblocks_raw").count())


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 19, Finished, Available, Finished)

✅ bronze_classblocks_raw 62


In [8]:
# Bronze Academic: UMass Boston Academic Calendar → day-level tags
import pandas as pd
from datetime import date, timedelta
from pyspark.sql import functions as F

rows = []

def add_day(dt, tag, weight=1, holiday=0, brk=0, finals=0, study=0, class_start=0):
    rows.append(dict(date=pd.to_datetime(dt), acad_tag=tag, acad_weight=weight,
                     is_holiday=holiday, is_break=brk, is_finals=finals,
                     is_study=study, is_classes_start=class_start))

def add_range(start, end, tag, weight=1, holiday=0, brk=0, finals=0, study=0):
    d = pd.to_datetime(start); e = pd.to_datetime(end)
    cur = d
    while cur <= e:
        add_day(cur, tag, weight, holiday, brk, finals, study)
        cur += pd.Timedelta(days=1)

# -------- FALL 2025 --------
add_day("2025-09-01", "labor_day", weight=2, holiday=1)                  # Holiday
add_day("2025-09-02", "classes_start_fall", weight=2, class_start=1)     # Semester begins
add_day("2025-09-11", "convocation", weight=1)
add_day("2025-10-13", "indigenous_peoples_day", weight=2, holiday=1)
add_day("2025-11-11", "veterans_day", weight=2, holiday=1)
add_range("2025-11-27", "2025-11-30", "thanksgiving_recess", weight=2, brk=1)  # Recess
add_day("2025-12-01", "classes_resume_post_thanks", weight=1)
add_day("2025-12-12", "last_day_of_classes_fall", weight=1)
add_range("2025-12-13", "2025-12-14", "study_period_fall", weight=1, study=1)
add_range("2025-12-15", "2025-12-19", "final_exams_fall", weight=2, finals=1)

# -------- WINTER 2026 --------
add_day("2025-12-30", "new_years_observed", weight=2, holiday=1)
add_day("2025-12-31", "new_years_observed", weight=2, holiday=1)
add_day("2026-01-01", "new_years_day", weight=2, holiday=1)
add_day("2026-01-05", "winter_classes_begin", weight=1, class_start=1)
add_day("2026-01-19", "mlk_day", weight=2, holiday=1)

# -------- SPRING 2026 --------
add_day("2026-01-26", "classes_start_spring", weight=2, class_start=1)
add_day("2026-02-16", "presidents_day", weight=2, holiday=1)
add_range("2026-03-15", "2026-03-22", "spring_break", weight=2, brk=1)   # No classes
add_day("2026-03-23", "classes_resume_spring", weight=1)
add_day("2026-04-20", "patriots_day", weight=2, holiday=1)
add_day("2026-05-13", "last_day_of_classes_spring", weight=1)
add_range("2026-05-14", "2026-05-17", "study_period_spring", weight=1, study=1)
add_range("2026-05-18", "2026-05-22", "final_exams_spring", weight=2, finals=1)
add_day("2026-05-25", "memorial_day", weight=2, holiday=1)
add_day("2026-05-27", "graduate_commencement", weight=1)
add_day("2026-05-28", "undergraduate_commencement", weight=1)

# -------- SUMMER 2026 (holiday markers most relevant to dining demand) --------
add_day("2026-06-19", "juneteenth", weight=2, holiday=1)
add_day("2026-07-04", "independence_day", weight=2, holiday=1)

acad_pdf = pd.DataFrame(rows)

sdf = spark.createDataFrame(acad_pdf) \
           .withColumn("date", F.to_date("date")) \
           .withColumn("acad_tag", F.coalesce(F.col("acad_tag"), F.lit("unknown"))) \
           .withColumn("acad_weight", F.coalesce(F.col("acad_weight"), F.lit(0)))
(sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_academic_raw"))

print("✅ bronze_academic_raw rows:", spark.table("bronze_academic_raw").count())
spark.table("bronze_academic_raw").orderBy("date").limit(10).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 22, Finished, Available, Finished)

✅ bronze_academic_raw rows: 50
+----------+--------------------------+-----------+----------+--------+---------+--------+----------------+
|date      |acad_tag                  |acad_weight|is_holiday|is_break|is_finals|is_study|is_classes_start|
+----------+--------------------------+-----------+----------+--------+---------+--------+----------------+
|2025-09-01|labor_day                 |2          |1         |0       |0        |0       |0               |
|2025-09-02|classes_start_fall        |2          |0         |0       |0        |0       |1               |
|2025-09-11|convocation               |1          |0         |0       |0        |0       |0               |
|2025-10-13|indigenous_peoples_day    |2          |1         |0       |0        |0       |0               |
|2025-11-11|veterans_day              |2          |1         |0       |0        |0       |0               |
|2025-11-27|thanksgiving_recess       |2          |0         |1       |0        |0       |0              

In [9]:
# --- Bronze: Dining Ops schedule from fixed hours ---
import pandas as pd
import pyspark.sql.functions as F
from datetime import timedelta

# pull the model date window from weather
mm = spark.table("bronze_weather_raw") \
          .select(F.min("date").alias("start"), F.max("date").alias("end")).collect()[0]
start, end = pd.to_datetime(mm["start"]).date(), pd.to_datetime(mm["end"]).date()

def hours_to_float(hhmm):
    """'HH:MM' -> float hours since midnight"""
    h, m = map(int, hhmm.split(":")); return h + m/60.0

def span_hours(start_hhmm, end_hhmm):
    return round(hours_to_float(end_hhmm) - hours_to_float(start_hhmm), 2)

# fixed hours you provided
WK_BF = ("07:00","11:00")
WK_LN = ("11:00","14:00")
WK_DN = ("17:00","20:00")

WE_BR = ("10:00","14:00")   # brunch
WE_DN = ("17:00","19:30")

rows = []
d = start
while d <= end:
    dow = d.weekday()              # 0=Mon ... 6=Sun
    is_weekend = 1 if dow in (5,6) else 0

    if not is_weekend:
        # Mon–Fri
        bf_hrs = span_hours(*WK_BF)
        ln_hrs = span_hours(*WK_LN)
        dn_hrs = span_hours(*WK_DN)
        br_hrs = 0.0
        open_breakfast = 1; open_lunch = 1; open_dinner = 1; open_brunch = 0
    else:
        # Sat–Sun (brunch instead of bf/lunch)
        bf_hrs = 0.0
        ln_hrs = 0.0
        dn_hrs = span_hours(*WE_DN)
        br_hrs = span_hours(*WE_BR)
        open_breakfast = 0; open_lunch = 0; open_dinner = 1; open_brunch = 1

    total_hrs = round(bf_hrs + ln_hrs + dn_hrs + br_hrs, 2)

    rows.append({
        "date": pd.to_datetime(d),
        "service_model": "weekend" if is_weekend else "weekday",
        "open_breakfast": open_breakfast,
        "open_lunch": open_lunch,
        "open_dinner": open_dinner,
        "open_brunch": open_brunch,
        "breakfast_hours": bf_hrs,
        "lunch_hours": ln_hrs,
        "dinner_hours": dn_hrs,
        "brunch_hours": br_hrs,
        "open_hours_total": total_hrs,
        "is_closed": 1 if total_hrs == 0 else 0
    })
    d += timedelta(days=1)

ops_pdf = pd.DataFrame(rows)


sdf = spark.createDataFrame(ops_pdf).withColumn("date", F.to_date("date"))
(sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_ops_raw"))

print("✅ bronze_ops_raw rows:", spark.table("bronze_ops_raw").count())
spark.table("bronze_ops_raw").orderBy("date").limit(5).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 23, Finished, Available, Finished)

✅ bronze_ops_raw rows: 62
+----------+-------------+--------------+----------+-----------+-----------+---------------+-----------+------------+------------+----------------+---------+
|date      |service_model|open_breakfast|open_lunch|open_dinner|open_brunch|breakfast_hours|lunch_hours|dinner_hours|brunch_hours|open_hours_total|is_closed|
+----------+-------------+--------------+----------+-----------+-----------+---------------+-----------+------------+------------+----------------+---------+
|2025-07-01|weekday      |1             |1         |1          |0          |4.0            |3.0        |3.0         |0.0         |10.0            |0        |
|2025-07-02|weekday      |1             |1         |1          |0          |4.0            |3.0        |3.0         |0.0         |10.0            |0        |
|2025-07-03|weekday      |1             |1         |1          |0          |4.0            |3.0        |3.0         |0.0         |10.0            |0        |
|2025-07-04|weekday      |

In [10]:
# UPDATED demand simulator: academics + ops aware (Fabric)
import pandas as pd, numpy as np
from pyspark.sql import functions as F

# ---------- Load features ----------
def load_features():
    # Preferred: use the consolidated Gold features if present
    if spark.catalog.tableExists("gold_features_day"):
        g = spark.table("gold_features_day").toPandas()
        # Ensure required columns exist with defaults
        defaults = {
            "event_count_all":0, "event_count_home":0, "event_load_score":0, "major_event_flag":0,
            "major_academic_flag":0, "minor_academic_flag":0, "holiday_flag":0, "break_flag":0,
            "finals_flag":0, "study_flag":0, "classes_start_flag":0,
            "class_block_count":0, "lunch_window_blocks":0, "dinner_window_blocks":0,
            "tmax_c":0.0, "precip_mm":0.0,
            "is_closed":0, "is_reduced":0, "open_hours_total":10.0,
            "open_breakfast":1, "open_lunch":1, "open_dinner":1, "open_brunch":0
        }
        for col, val in defaults.items():
            if col not in g.columns:
                g[col] = val
        g["date"] = pd.to_datetime(g["date"])
        if "dow" not in g.columns:
            g["dow"] = g["date"].dt.weekday
        return g.sort_values("date")

    # Otherwise build features from Bronze/Silver tables
    w = spark.table("bronze_weather_raw").toPandas()
    e = spark.table("bronze_events_raw").toPandas() if spark.catalog.tableExists("bronze_events_raw") else pd.DataFrame(
        columns=["date","event_count_all","event_count_home","event_load_score","major_event_flag"])
    c = spark.table("bronze_classblocks_raw").toPandas()
    a = spark.table("silver_academic_day").toPandas() if spark.catalog.tableExists("silver_academic_day") else pd.DataFrame(
        columns=["date","major_academic_flag","minor_academic_flag","holiday_flag","break_flag","finals_flag","study_flag","classes_start_flag"])

    # Ops preferred from Silver; if missing, derive defaults from your hours
    if spark.catalog.tableExists("silver_ops_day"):
        o = spark.table("silver_ops_day").toPandas()
    else:
        # derive ops from weekday/weekend rule-of-thumb (weekday=10h; weekend brunch 4h + dinner 2.5h)
        mm = spark.table("bronze_weather_raw").select(F.min("date").alias("start"), F.max("date").alias("end")).collect()[0]
        dates = pd.date_range(pd.to_datetime(mm["start"]), pd.to_datetime(mm["end"]), freq="D")
        o = pd.DataFrame({"date": dates})
        o["dow"] = o["date"].dt.weekday
        o["open_brunch"] = (o["dow"].isin([5,6])).astype(int)
        o["open_breakfast"] = (1 - o["open_brunch"]).astype(int)
        o["open_lunch"] = (1 - o["open_brunch"]).astype(int)
        o["open_dinner"] = 1
        o["open_hours_total"] = np.where(o["open_brunch"]==1, 4.0+2.5, 10.0)
        o["is_closed"] = 0
        o["is_reduced"] = (o["open_hours_total"] < 10.0).astype(int)
        o = o.drop(columns=["dow"])

    # Join and fill
    g = (w.merge(e, on="date", how="left")
           .merge(c, on="date", how="left")
           .merge(a, on="date", how="left")
           .merge(o, on="date", how="left"))
    g = g.fillna({
        "event_count_all":0,"event_count_home":0,"event_load_score":0,"major_event_flag":0,
        "class_block_count":0,"lunch_window_blocks":0,"dinner_window_blocks":0,
        "tmax_c":0.0,"precip_mm":0.0,
        "major_academic_flag":0,"minor_academic_flag":0,"holiday_flag":0,"break_flag":0,"finals_flag":0,"study_flag":0,"classes_start_flag":0,
        "is_closed":0,"is_reduced":0,"open_hours_total":10.0,
        "open_breakfast":1,"open_lunch":1,"open_dinner":1,"open_brunch":0
    })
    g["date"] = pd.to_datetime(g["date"])
    g["dow"] = g["date"].dt.weekday
    return g.sort_values("date")

g = load_features()

# ---------- Demand model (rules-based; safe coefficients) ----------
def base_by_dow(dow: int) -> int:
    # Mon..Sun baseline covers
    return [220, 240, 260, 250, 230, 180, 160][dow]

COEF_LUNCHBLK    = 2.0
COEF_DINNERBLK   = 1.0
COEF_HOME        = 15.0
COEF_MAJOR_ATH   = 25.0
COEF_AC_MAJOR    = 20.0
COEF_AC_MINOR    = 10.0
COEF_HOLIDAY     = -60.0
COEF_BREAK       = -80.0
COEF_FINALS      = -10.0
COEF_TMAX        = -0.4
COEF_RAIN        = -20.0

WEEKDAY_BASE_HRS = 10.0  # 7–11 + 11–14 + 17–20

# Weather flags
is_rain = (g["precip_mm"].fillna(0) > 0).astype(int)

# Mean demand (mu)
mu = g["dow"].map(base_by_dow).astype("float64")
mu += COEF_LUNCHBLK * g["lunch_window_blocks"].fillna(0)
mu += COEF_DINNERBLK * g["dinner_window_blocks"].fillna(0)
mu += COEF_HOME * g["event_count_home"].fillna(0)
mu += COEF_MAJOR_ATH * g["major_event_flag"].fillna(0)
mu += COEF_AC_MAJOR * g["major_academic_flag"].fillna(0) + COEF_AC_MINOR * g["minor_academic_flag"].fillna(0)
mu += COEF_HOLIDAY * g["holiday_flag"].fillna(0) + COEF_BREAK * g["break_flag"].fillna(0) + COEF_FINALS * g["finals_flag"].fillna(0)
mu += COEF_TMAX * g["tmax_c"].astype("float64").fillna(0) + COEF_RAIN * is_rain

# Scale by open hours; zero if closed
open_frac = (g["open_hours_total"].fillna(0) / WEEKDAY_BASE_HRS).clip(0.0, 1.2)
mu = mu * open_frac
mu = np.where(g["is_closed"].fillna(0).astype(int).values == 1, 0.0, mu)

# Clean and cap for Poisson
mu = np.nan_to_num(mu, nan=120.0, posinf=1500.0, neginf=0.0).clip(0.0, 1500.0)

print("μ stats → min / max / mean:", float(np.min(mu)), float(np.max(mu)), float(np.mean(mu)))

# Poisson draw
rng = np.random.default_rng(42)
total = rng.poisson(mu)

# ---------- Meal allocation ----------
bf = np.zeros_like(total)
ln = np.zeros_like(total)
dn = np.zeros_like(total)

# Weekday shares (if all three meals open)
BF_SHARE, LN_SHARE, DN_SHARE = 0.18, 0.42, 0.40
# Weekend shares (brunch + dinner)
BR_SHARE_WE, DN_SHARE_WE = 0.55, 0.45

for i in range(len(total)):
    if total[i] == 0 or int(g.loc[i, "is_closed"]) == 1:
        continue

    if int(g.loc[i, "open_brunch"]) == 1:
        # Weekend: brunch goes into "lunch" column
        br = int(round(total[i] * BR_SHARE_WE))
        dn[i] = total[i] - br
        ln[i] = br
        bf[i] = 0
    else:
        # Weekday: respect which meals are open
        opens = np.array([
            int(g.loc[i, "open_breakfast"]),
            int(g.loc[i, "open_lunch"]),
            int(g.loc[i, "open_dinner"])
        ], dtype=float)
        shares = np.array([BF_SHARE, LN_SHARE, DN_SHARE]) * opens
        if shares.sum() == 0:
            # Edge: not marked closed, but all meals closed → push to dinner for visibility
            dn[i] = total[i]
        else:
            shares = shares / shares.sum()
            bf[i] = int(round(total[i] * shares[0]))
            ln[i] = int(round(total[i] * shares[1]))
            dn[i] = total[i] - bf[i] - ln[i]

out = pd.DataFrame({
    "date": pd.to_datetime(g["date"]),
    "breakfast": bf.astype(int),
    "lunch": ln.astype(int),
    "dinner": dn.astype(int)
})
out["covers_total"] = out["breakfast"] + out["lunch"] + out["dinner"]

# ---------- Write Bronze demand ----------
sdf = spark.createDataFrame(out).withColumn("date", F.to_date("date"))
(sdf.write.mode("overwrite").format("delta").saveAsTable("bronze_demand_raw"))

print("✅ bronze_demand_raw rows:", spark.table("bronze_demand_raw").count())
spark.table("bronze_demand_raw").orderBy("date").limit(5).show(truncate=False)

# (Optional) peek largest μ days to sanity-check drivers
top_idx = np.argsort(-mu)[:5]
print(pd.concat([
    g.loc[top_idx, ["date","event_count_home","major_event_flag","major_academic_flag",
                    "holiday_flag","break_flag","lunch_window_blocks","dinner_window_blocks",
                    "tmax_c","precip_mm","open_hours_total","is_closed"]],
    pd.Series(mu[top_idx], name="mu")
], axis=1).to_string(index=False))


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 24, Finished, Available, Finished)

μ stats → min / max / mean: 82.26400000000001 258.44 189.5747741935484
✅ bronze_demand_raw rows: 62
+----------+---------+-----+------+------------+
|date      |breakfast|lunch|dinner|covers_total|
+----------+---------+-----+------+------------+
|2025-07-01|41       |95   |90    |226         |
|2025-07-02|46       |107  |101   |254         |
|2025-07-03|36       |84   |80    |200         |
|2025-07-04|42       |99   |94    |235         |
|2025-07-05|0        |53   |43    |96          |
+----------+---------+-----+------+------------+

      date  event_count_home  major_event_flag  major_academic_flag  holiday_flag  break_flag  lunch_window_blocks  dinner_window_blocks  tmax_c  precip_mm  open_hours_total  is_closed     mu
2025-07-29               0.0               0.0                  0.0           0.0         0.0                  3.0                   0.0    37.0        0.0              10.0        0.0    NaN
2025-08-14               0.0               0.0                  0.0       

In [11]:
%%sql
-- ========= SILVER LAYER =========

-- 1) Date spine (based on your weather range)
CREATE OR REPLACE TABLE silver_dim_date AS
SELECT d AS date
FROM (
  SELECT sequence(min(CAST(date AS DATE)), max(CAST(date AS DATE)), interval 1 day) AS seq
  FROM bronze_weather_raw
) s
LATERAL VIEW explode(seq) e AS d;

-- 2) Weather (day grain)
CREATE OR REPLACE TABLE silver_weather_day AS
SELECT CAST(date AS DATE) AS date,
       tmax_c, tmin_c, precip_mm,
       CASE WHEN COALESCE(precip_mm,0) > 0 THEN 1 ELSE 0 END AS is_rain,
       0 AS is_snow
FROM bronze_weather_raw;

-- 3) Athletics / events (rolled to day)
CREATE OR REPLACE TABLE silver_events_day AS
SELECT CAST(date AS DATE) AS date,
       COALESCE(event_count_all,  0) AS event_count_all,
       COALESCE(event_count_home, 0) AS event_count_home,
       COALESCE(event_load_score, 0) AS event_load_score,
       COALESCE(major_event_flag, 0) AS major_event_flag
FROM bronze_events_raw;

-- 4) Class load (day grain)
CREATE OR REPLACE TABLE silver_class_load_day AS
SELECT CAST(date AS DATE) AS date,
       COALESCE(block_count,0)          AS class_block_count,
       COALESCE(lunch_window_blocks,0)  AS lunch_window_blocks,
       COALESCE(dinner_window_blocks,0) AS dinner_window_blocks
FROM bronze_classblocks_raw;

-- 5) Academic calendar (flags)
CREATE OR REPLACE TABLE silver_academic_day AS
SELECT CAST(date AS DATE) AS date,
       MAX(CASE WHEN COALESCE(acad_weight,0) >= 2 THEN 1 ELSE 0 END) AS major_academic_flag,
       MAX(CASE WHEN COALESCE(acad_weight,0) =  1 THEN 1 ELSE 0 END) AS minor_academic_flag,
       MAX(CASE WHEN COALESCE(is_holiday,0) = 1 THEN 1 ELSE 0 END)   AS holiday_flag,
       MAX(CASE WHEN COALESCE(is_break,0)   = 1 THEN 1 ELSE 0 END)   AS break_flag,
       MAX(CASE WHEN COALESCE(is_finals,0)  = 1 THEN 1 ELSE 0 END)   AS finals_flag,
       MAX(CASE WHEN COALESCE(is_study,0)   = 1 THEN 1 ELSE 0 END)   AS study_flag,
       MAX(CASE WHEN COALESCE(is_classes_start,0) = 1 THEN 1 ELSE 0 END) AS classes_start_flag
FROM bronze_academic_raw
GROUP BY CAST(date AS DATE);

-- 6) Dining ops / hours (day grain)
CREATE OR REPLACE TABLE silver_ops_day AS
SELECT CAST(date AS DATE) AS date,
       service_model,
       COALESCE(open_breakfast,0) AS open_breakfast,
       COALESCE(open_lunch,0)     AS open_lunch,
       COALESCE(open_dinner,0)    AS open_dinner,
       COALESCE(open_brunch,0)    AS open_brunch,
       COALESCE(breakfast_hours,0.0) AS breakfast_hours,
       COALESCE(lunch_hours,0.0)     AS lunch_hours,
       COALESCE(dinner_hours,0.0)    AS dinner_hours,
       COALESCE(brunch_hours,0.0)    AS brunch_hours,
       COALESCE(open_hours_total,0.0) AS open_hours_total,
       CASE WHEN COALESCE(open_hours_total,0.0) = 0 THEN 1 ELSE 0 END AS is_closed,
       CASE WHEN service_model='weekend' OR COALESCE(open_hours_total,0.0) < 10.0 THEN 1 ELSE 0 END AS is_reduced
FROM bronze_ops_raw;

-- 7) Demand (day grain from Bronze demand)
CREATE OR REPLACE TABLE silver_demand_day AS
SELECT CAST(date AS DATE) AS date,
       COALESCE(breakfast,0) AS breakfast,
       COALESCE(lunch,0)     AS lunch,
       COALESCE(dinner,0)    AS dinner,
       COALESCE(breakfast,0) + COALESCE(lunch,0) + COALESCE(dinner,0) AS covers_total
FROM bronze_demand_raw;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 31, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [12]:
%%sql
-- ============ GOLD LAYER ============

-- 1) Feature mart (day grain)
CREATE OR REPLACE TABLE gold_features_day AS
SELECT
  d.date,
  dayofweek(d.date) AS dow,
  CASE WHEN dayofweek(d.date) IN (1,7) THEN 1 ELSE 0 END AS is_weekend,

  -- Athletics / Events
  COALESCE(e.event_count_all,0)  AS event_count_all,
  COALESCE(e.event_count_home,0) AS event_count_home,
  COALESCE(e.event_load_score,0) AS event_load_score,
  COALESCE(e.major_event_flag,0) AS major_event_flag,

  -- Academic
  COALESCE(a.major_academic_flag,0) AS major_academic_flag,
  COALESCE(a.minor_academic_flag,0) AS minor_academic_flag,
  COALESCE(a.holiday_flag,0)        AS holiday_flag,
  COALESCE(a.break_flag,0)          AS break_flag,
  COALESCE(a.finals_flag,0)         AS finals_flag,
  COALESCE(a.study_flag,0)          AS study_flag,
  COALESCE(a.classes_start_flag,0)  AS classes_start_flag,

  -- Class blocks
  COALESCE(c.class_block_count,0)      AS class_block_count,
  COALESCE(c.lunch_window_blocks,0)    AS lunch_window_blocks,
  COALESCE(c.dinner_window_blocks,0)   AS dinner_window_blocks,

  -- Weather
  w.tmax_c, w.tmin_c, w.precip_mm, w.is_rain, w.is_snow,

  -- Dining ops / hours
  COALESCE(o.is_closed,0)              AS is_closed,
  COALESCE(o.is_reduced,0)             AS is_reduced,
  COALESCE(o.open_hours_total,0.0)     AS open_hours_total,
  COALESCE(o.open_breakfast,0)         AS open_breakfast,
  COALESCE(o.open_lunch,0)             AS open_lunch,
  COALESCE(o.open_dinner,0)            AS open_dinner,
  COALESCE(o.open_brunch,0)            AS open_brunch,
  COALESCE(o.breakfast_hours,0.0)      AS breakfast_hours,
  COALESCE(o.lunch_hours,0.0)          AS lunch_hours,
  COALESCE(o.dinner_hours,0.0)         AS dinner_hours,
  COALESCE(o.brunch_hours,0.0)         AS brunch_hours

FROM silver_dim_date d
LEFT JOIN silver_events_day      e ON d.date = e.date
LEFT JOIN silver_class_load_day  c ON d.date = c.date
LEFT JOIN silver_weather_day     w ON d.date = w.date
LEFT JOIN silver_academic_day    a ON d.date = a.date
LEFT JOIN silver_ops_day         o ON d.date = o.date
;

-- 2) Add demand to features
CREATE OR REPLACE TABLE gold_demand_enriched AS
SELECT f.*, d.covers_total, d.breakfast, d.lunch, d.dinner
FROM gold_features_day f
LEFT JOIN silver_demand_day d USING(date)
;

-- 3) Price/elasticity assumptions (edit later or replace with measured values)
CREATE OR REPLACE TABLE gold_price_config AS
SELECT CAST(16.75 AS DOUBLE) AS price_current,   -- average ticket
       CAST(-0.60 AS DOUBLE) AS elasticity,      -- assumed; replace if you estimate it
       CAST(4.50  AS DOUBLE) AS avg_portion_cost,
       CAST(0.00  AS DOUBLE) AS discount_pct     -- live control for single-scenario table
;

-- 4) Single-scenario table (driven by gold_price_config.discount_pct)
CREATE OR REPLACE TABLE gold_scenario_revenue AS
WITH pc AS (SELECT * FROM gold_price_config)
SELECT
  e.date,
  pc.discount_pct,
  pc.price_current * (1 - pc.discount_pct)                             AS price_scn,
  e.covers_total                                                       AS base_covers,
  e.covers_total * POWER((1 - pc.discount_pct), pc.elasticity)         AS predicted_covers_scn,
  (pc.price_current * (1 - pc.discount_pct)) *
  (e.covers_total * POWER((1 - pc.discount_pct), pc.elasticity))       AS revenue_scn,
  (e.covers_total * POWER((1 - pc.discount_pct), pc.elasticity)) *
   pc.avg_portion_cost                                                 AS cogs_scn
FROM gold_demand_enriched e CROSS JOIN pc
;

-- 5) Full discount grid (0–30% in 1% steps) for charts/slicers
CREATE OR REPLACE TABLE gold_scenario_grid AS
WITH pc AS (SELECT * FROM gold_price_config),
disc AS (SELECT explode(sequence(0, 30)) AS pct_int),
d AS (SELECT CAST(pct_int/100.0 AS DOUBLE) AS discount_pct FROM disc)
SELECT
  e.date,
  d.discount_pct,
  pc.price_current * (1 - d.discount_pct)                              AS price_scn,
  e.covers_total * POWER((1 - d.discount_pct), pc.elasticity)          AS predicted_covers_scn,
  (pc.price_current * (1 - d.discount_pct)) *
  (e.covers_total * POWER((1 - d.discount_pct), pc.elasticity))        AS revenue_scn,
  (e.covers_total * POWER((1 - d.discount_pct), pc.elasticity)) *
   pc.avg_portion_cost                                                 AS cogs_scn
FROM gold_demand_enriched e
CROSS JOIN d
CROSS JOIN pc
;

-- 6) Best discount per day (argmax margin)
CREATE OR REPLACE TABLE gold_best_discount AS
WITH grid AS (
  SELECT *,
         (revenue_scn - cogs_scn) AS margin_scn,
         ROW_NUMBER() OVER (PARTITION BY date ORDER BY (revenue_scn - cogs_scn) DESC, discount_pct ASC) AS rk
  FROM gold_scenario_grid
)
SELECT date, discount_pct AS best_discount_pct, margin_scn AS best_margin
FROM grid
WHERE rk = 1
;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 37, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [13]:
print("gold tables:",
      spark.catalog.tableExists("gold_features_day"),
      spark.catalog.tableExists("gold_demand_enriched"),
      spark.catalog.tableExists("gold_scenario_revenue"),
      spark.catalog.tableExists("gold_scenario_grid"),
      spark.catalog.tableExists("gold_best_discount"))

spark.table("gold_demand_enriched").orderBy("date").limit(5).show(truncate=False)
spark.table("gold_best_discount").orderBy("date").limit(5).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 38, Finished, Available, Finished)

gold tables: True True True True True
+----------+---+----------+---------------+----------------+----------------+----------------+-------------------+-------------------+------------+----------+-----------+----------+------------------+-----------------+-------------------+--------------------+------+------+---------+-------+-------+---------+----------+----------------+--------------+----------+-----------+-----------+---------------+-----------+------------+------------+------------+---------+-----+------+
|date      |dow|is_weekend|event_count_all|event_count_home|event_load_score|major_event_flag|major_academic_flag|minor_academic_flag|holiday_flag|break_flag|finals_flag|study_flag|classes_start_flag|class_block_count|lunch_window_blocks|dinner_window_blocks|tmax_c|tmin_c|precip_mm|is_rain|is_snow|is_closed|is_reduced|open_hours_total|open_breakfast|open_lunch|open_dinner|open_brunch|breakfast_hours|lunch_hours|dinner_hours|brunch_hours|covers_total|breakfast|lunch|dinner|
+-----

In [14]:
%%sql
-- Margin safeguards + max discount policy (edit if you want)
CREATE OR REPLACE TABLE gold_policy_config AS
SELECT
  CAST(0.30 AS DOUBLE) AS max_discount_pct,   -- cap discounts at 30%
  CAST(0.15 AS DOUBLE) AS min_markup_pct,     -- require >=15% markup over cost
  CAST(0.50 AS DOUBLE) AS min_margin_abs;     -- OR at least $0.50 margin per cover

-- Prior over plausible elasticities (weights sum to 1)
CREATE OR REPLACE TABLE gold_elasticity_prior AS
SELECT CAST(-0.80 AS DOUBLE) AS elasticity, CAST(0.25 AS DOUBLE) AS weight UNION ALL
SELECT CAST(-1.40 AS DOUBLE),             CAST(0.50 AS DOUBLE) UNION ALL
SELECT CAST(-2.00 AS DOUBLE),             CAST(0.25 AS DOUBLE);


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 40, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [15]:
%%sql
-- Price floor from policy: max(cost*(1+markup), cost + abs_margin)
CREATE OR REPLACE TABLE gold_scenario_grid_robust AS
WITH pc   AS (SELECT * FROM gold_price_config),
pol  AS (SELECT * FROM gold_policy_config),
pri  AS (SELECT * FROM gold_elasticity_prior),
disc AS (SELECT explode(sequence(0, CAST(100*0.30 AS INT))) AS pct_int)  -- 0..30%
SELECT
  e.date,
  CAST(pct_int/100.0 AS DOUBLE)          AS discount_pct,
  pri.elasticity,
  pri.weight,
  pc.price_current * (1 - pct_int/100.0) AS price_scn,
  e.covers_total * POWER(1 - pct_int/100.0, pri.elasticity) AS covers_scn,
  (pc.price_current * (1 - pct_int/100.0)) *
     (e.covers_total * POWER(1 - pct_int/100.0, pri.elasticity))         AS revenue_scn,
  (e.covers_total * POWER(1 - pct_int/100.0, pri.elasticity)) *
     pc.avg_portion_cost                                                  AS cogs_scn
FROM gold_demand_enriched e
CROSS JOIN pc
CROSS JOIN pri
CROSS JOIN pol
CROSS JOIN disc
WHERE (pc.price_current * (1 - pct_int/100.0)) >=
      GREATEST(pc.avg_portion_cost*(1+pol.min_markup_pct), pc.avg_portion_cost + pol.min_margin_abs);


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 41, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [16]:
%%sql
CREATE OR REPLACE TABLE gold_best_discount_robust AS
WITH expm AS (
  SELECT date, discount_pct,
         SUM(weight * (revenue_scn - cogs_scn)) AS expected_margin
  FROM gold_scenario_grid_robust
  GROUP BY date, discount_pct
),
ranked AS (
  SELECT date, discount_pct, expected_margin,
         ROW_NUMBER() OVER (PARTITION BY date ORDER BY expected_margin DESC, discount_pct ASC) AS rk
  FROM expm
)
SELECT date, discount_pct AS best_discount_pct, expected_margin
FROM ranked WHERE rk = 1;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 42, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [17]:
spark.table("gold_best_discount_robust").orderBy("date").limit(10).show(truncate=False)
# See the shape of the margin curve at a few discounts
spark.table("gold_scenario_grid_robust")\
     .filter("date >= '2025-07-01' AND date <= '2025-07-07'")\
     .groupBy("date","discount_pct")\
     .agg(F.sum((F.col("revenue_scn")-F.col("cogs_scn"))*F.col("weight")).alias("expected_margin"))\
     .orderBy("date","discount_pct").show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 43, Finished, Available, Finished)

+----------+-----------------+------------------+
|date      |best_discount_pct|expected_margin   |
+----------+-----------------+------------------+
|2025-07-01|0.09             |2772.7036935737383|
|2025-07-02|0.09             |3116.224505166945 |
|2025-07-03|0.09             |2453.720082808618 |
|2025-07-04|0.09             |2883.121097300126 |
|2025-07-05|0.09             |1177.7856397481366|
|2025-07-06|0.09             |969.219432709404  |
|2025-07-07|0.09             |2465.988683222661 |
|2025-07-08|0.09             |2576.406086949049 |
|2025-07-09|0.09             |2956.7326997843847|
|2025-07-10|0.09             |2993.5385010265145|
+----------+-----------------+------------------+

+----------+------------+------------------+
|date      |discount_pct|expected_margin   |
+----------+------------+------------------+
|2025-07-01|0.0         |2768.5            |
|2025-07-01|0.01        |2769.3632109877217|
|2025-07-01|0.02        |2770.141368080088 |
|2025-07-01|0.03        |2770

In [19]:
%%sql
-- Side-by-side vs baseline (0% discount) — FIXED JOIN
CREATE OR REPLACE TABLE gold_robust_comparison AS
WITH base AS (
  SELECT date, discount_pct, exp_covers, exp_revenue, exp_margin
  FROM gold_robust_agg
  WHERE discount_pct = 0.0
),
best AS (
  -- join on date AND match the chosen discount to the best_discount_pct
  SELECT a.date, a.discount_pct, a.exp_covers, a.exp_revenue, a.exp_margin
  FROM gold_robust_agg a
  INNER JOIN gold_best_discount_robust b
    ON a.date = b.date
   AND a.discount_pct = b.best_discount_pct
)
SELECT
  b.date,
  b.discount_pct                         AS best_discount_pct,
  b.exp_covers                           AS covers_best,
  base.exp_covers                        AS covers_base,
  (b.exp_covers - base.exp_covers)       AS covers_delta,
  b.exp_revenue                          AS revenue_best,
  base.exp_revenue                       AS revenue_base,
  (b.exp_revenue - base.exp_revenue)     AS revenue_delta,
  b.exp_margin                           AS margin_best,
  base.exp_margin                        AS margin_base,
  (b.exp_margin - base.exp_margin)       AS margin_delta
FROM best b
JOIN base ON b.date = base.date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 47, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [20]:
spark.table("gold_robust_comparison").orderBy("date").limit(10).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 48, Finished, Available, Finished)

+----------+-----------------+------------------+-----------+------------------+------------------+------------+------------------+------------------+-----------+------------------+
|date      |best_discount_pct|covers_best       |covers_base|covers_delta      |revenue_best      |revenue_base|revenue_delta     |margin_best       |margin_base|margin_delta      |
+----------+-----------------+------------------+-----------+------------------+------------------+------------+------------------+------------------+-----------+------------------+
|2025-07-01|0.09             |258.10599893634986|226.0      |32.10599893634986 |3934.1806887873126|3785.5      |148.68068878731265|2772.7036935737383|2768.5     |4.203693573738292 |
|2025-07-02|0.09             |290.0837333178445 |254.0      |36.08373331784452 |4421.601305097245 |4254.5      |167.10130509724513|3116.224505166945 |3111.5     |4.724505166945164 |
|2025-07-03|0.09             |228.41238843924765|200.0      |28.41238843924765 |3481.57583

In [21]:
%%sql
CREATE OR REPLACE TABLE gold_policy_config AS
SELECT
  CAST(0.30 AS DOUBLE) AS max_discount_pct,    -- safety cap (already enforced)
  CAST(0.15 AS DOUBLE) AS min_markup_pct,      -- margin floor: >=15% over cost
  CAST(0.50 AS DOUBLE) AS min_margin_abs,      -- or >= $0.50 absolute margin
  -- NEW guardrails for *choosing* a discount
  CAST(5.0  AS DOUBLE) AS min_margin_delta,    -- require at least +$5 expected margin vs 0%
  CAST(10.0 AS DOUBLE) AS min_covers_delta,    -- and at least +10 covers vs 0%
  CAST(0.95 AS DOUBLE) AS low_demand_ratio,    -- only discount if base covers < 95% of weekday avg
  CAST(1    AS INT)    AS avoid_event_days,    -- 1 = avoid discount on home-event days
  CAST(0    AS INT)    AS avoid_weekends;      -- 1 = avoid Sat/Sun


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 49, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [23]:
%%sql
CREATE OR REPLACE TABLE gold_recommendations AS
WITH gf AS (
  SELECT date,
         dayofweek(date) AS dow,
         COALESCE(event_count_home,0) AS event_home,
         CASE WHEN dayofweek(date) IN (1,7) THEN 1 ELSE 0 END AS is_weekend
  FROM gold_features_day
),
base AS (
  SELECT r.date, r.best_discount_pct, r.covers_best, r.covers_base,
         r.covers_delta, r.revenue_best, r.revenue_base, r.revenue_delta,
         r.margin_best,  r.margin_base,  r.margin_delta,
         gf.dow, gf.event_home, gf.is_weekend
  FROM gold_robust_comparison r
  JOIN gf ON r.date = gf.date
),
weekday_avg AS (
  SELECT dow, AVG(covers_base) AS avg_covers_base
  FROM base GROUP BY dow
),
cfg AS (SELECT * FROM gold_policy_config)
SELECT
  b.date,
  b.best_discount_pct,
  -- eligibility checks
  (b.margin_delta >= cfg.min_margin_delta)                               AS ok_margin_delta,
  (b.covers_delta >= cfg.min_covers_delta)                               AS ok_covers_delta,
  (b.covers_base < cfg.low_demand_ratio * w.avg_covers_base)             AS ok_low_demand,
  (cfg.avoid_event_days = 0 OR b.event_home = 0)                         AS ok_event_rule,
  (cfg.avoid_weekends = 0 OR b.is_weekend = 0)                           AS ok_weekend_rule,
  -- final recommendation
  CASE
    WHEN b.margin_delta < cfg.min_margin_delta THEN 0.0
    WHEN b.covers_delta < cfg.min_covers_delta THEN 0.0
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 0.0
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 0.0
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 0.0
    ELSE b.best_discount_pct
  END AS recommended_discount_pct,
  CASE
    WHEN b.margin_delta < cfg.min_margin_delta THEN 'No: margin lift too small'
    WHEN b.covers_delta < cfg.min_covers_delta THEN 'No: covers lift too small'
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 'No: not a low-demand day'
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 'No: home event day'
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 'No: weekend blocked'
    ELSE 'Yes: passes policy'
  END AS recommendation_reason,
  -- context
  b.covers_base, b.covers_best, b.covers_delta,
  b.revenue_base, b.revenue_best, b.revenue_delta,
  b.margin_base,  b.margin_best,  b.margin_delta
FROM base b
JOIN weekday_avg w USING (dow)
CROSS JOIN cfg
ORDER BY b.date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 51, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [24]:
spark.table("gold_recommendations").orderBy("date").limit(12).show(truncate=False)


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 52, Finished, Available, Finished)

+----------+-----------------+---------------+---------------+-------------+-------------+---------------+------------------------+-------------------------+-----------+------------------+------------------+------------+------------------+------------------+-----------+------------------+------------------+
|date      |best_discount_pct|ok_margin_delta|ok_covers_delta|ok_low_demand|ok_event_rule|ok_weekend_rule|recommended_discount_pct|recommendation_reason    |covers_base|covers_best       |covers_delta      |revenue_base|revenue_best      |revenue_delta     |margin_base|margin_best       |margin_delta      |
+----------+-----------------+---------------+---------------+-------------+-------------+---------------+------------------------+-------------------------+-----------+------------------+------------------+------------+------------------+------------------+-----------+------------------+------------------+
|2025-07-01|0.09             |false          |true           |false      

In [25]:
%%sql
-- Policy with absolute OR relative margin lift
CREATE OR REPLACE TABLE gold_policy_config AS
SELECT
  CAST(0.30 AS DOUBLE) AS max_discount_pct,
  CAST(0.15 AS DOUBLE) AS min_markup_pct,
  CAST(0.50 AS DOUBLE) AS min_margin_abs,
  CAST(5.0  AS DOUBLE) AS min_margin_delta,      -- absolute $ lift floor
  CAST(0.015 AS DOUBLE) AS min_margin_delta_pct, -- +1.5% vs baseline
  CAST(10.0 AS DOUBLE) AS min_covers_delta,
  CAST(0.95 AS DOUBLE) AS low_demand_ratio,
  CAST(1    AS INT)    AS avoid_event_days,
  CAST(0    AS INT)    AS avoid_weekends;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 53, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [26]:
%%sql
-- Rebuild recommendations using the OR rule on margin lift
CREATE OR REPLACE TABLE gold_recommendations AS
WITH gf AS (
  SELECT date, dayofweek(date) AS dow,
         COALESCE(event_count_home,0) AS event_home,
         CASE WHEN dayofweek(date) IN (1,7) THEN 1 ELSE 0 END AS is_weekend
  FROM gold_features_day
),
base AS (
  SELECT r.*, gf.dow, gf.event_home, gf.is_weekend
  FROM gold_robust_comparison r JOIN gf USING(date)
),
weekday_avg AS (SELECT dow, AVG(covers_base) AS avg_covers_base FROM base GROUP BY dow),
cfg AS (SELECT * FROM gold_policy_config)
SELECT
  b.date, b.best_discount_pct,
  (b.margin_delta >= cfg.min_margin_delta OR
   (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) AS ok_margin_delta,
  (b.covers_delta >= cfg.min_covers_delta)                               AS ok_covers_delta,
  (b.covers_base < cfg.low_demand_ratio * w.avg_covers_base)             AS ok_low_demand,
  (cfg.avoid_event_days = 0 OR b.event_home = 0)                         AS ok_event_rule,
  (cfg.avoid_weekends = 0 OR b.is_weekend = 0)                           AS ok_weekend_rule,
  CASE
    WHEN NOT (b.margin_delta >= cfg.min_margin_delta OR
              (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) THEN 0.0
    WHEN b.covers_delta < cfg.min_covers_delta THEN 0.0
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 0.0
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 0.0
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 0.0
    ELSE b.best_discount_pct
  END AS recommended_discount_pct,
  CASE
    WHEN NOT (b.margin_delta >= cfg.min_margin_delta OR
              (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) THEN 'No: margin lift too small'
    WHEN b.covers_delta < cfg.min_covers_delta THEN 'No: covers lift too small'
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 'No: not a low-demand day'
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 'No: home event day'
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 'No: weekend blocked'
    ELSE 'Yes: passes policy'
  END AS recommendation_reason,
  b.covers_base, b.covers_best, b.covers_delta,
  b.revenue_base, b.revenue_best, b.revenue_delta,
  b.margin_base,  b.margin_best,  b.margin_delta
FROM base b
JOIN weekday_avg w USING(dow)
CROSS JOIN cfg
ORDER BY b.date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 54, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [27]:
%%sql
SELECT recommendation_reason, COUNT(*) AS days
FROM gold_recommendations
GROUP BY recommendation_reason
ORDER BY days DESC;

SELECT AVG(best_discount_pct) AS avg_best, AVG(recommended_discount_pct) AS avg_recommended
FROM gold_recommendations;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 56, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 2 fields>

<Spark SQL result set with 1 rows and 2 fields>

In [28]:
%%sql
-- soften the gates to match your current curve
CREATE OR REPLACE TABLE gold_policy_config AS
SELECT
  CAST(0.30 AS DOUBLE) AS max_discount_pct,
  CAST(0.15 AS DOUBLE) AS min_markup_pct,
  CAST(0.50 AS DOUBLE) AS min_margin_abs,
  CAST(3.0  AS DOUBLE) AS min_margin_delta,       -- was 5.0
  CAST(0.001 AS DOUBLE) AS min_margin_delta_pct,  -- was 0.015 (1.5%); now 0.1%
  CAST(10.0 AS DOUBLE) AS min_covers_delta,
  CAST(0.95 AS DOUBLE) AS low_demand_ratio,
  CAST(1    AS INT)    AS avoid_event_days,
  CAST(0    AS INT)    AS avoid_weekends;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 57, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [29]:
%%sql
CREATE OR REPLACE TABLE gold_recommendations AS
WITH gf AS (
  SELECT date, dayofweek(date) AS dow,
         COALESCE(event_count_home,0) AS event_home,
         CASE WHEN dayofweek(date) IN (1,7) THEN 1 ELSE 0 END AS is_weekend
  FROM gold_features_day
),
base AS (
  SELECT r.*, gf.dow, gf.event_home, gf.is_weekend
  FROM gold_robust_comparison r JOIN gf USING(date)
),
weekday_avg AS (SELECT dow, AVG(covers_base) AS avg_covers_base FROM base GROUP BY dow),
cfg AS (SELECT * FROM gold_policy_config)
SELECT
  b.date, b.best_discount_pct,
  (b.margin_delta >= cfg.min_margin_delta OR
   (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) AS ok_margin_delta,
  (b.covers_delta >= cfg.min_covers_delta)                               AS ok_covers_delta,
  (b.covers_base < cfg.low_demand_ratio * w.avg_covers_base)             AS ok_low_demand,
  (cfg.avoid_event_days = 0 OR b.event_home = 0)                         AS ok_event_rule,
  (cfg.avoid_weekends = 0 OR b.is_weekend = 0)                           AS ok_weekend_rule,
  CASE
    WHEN NOT (b.margin_delta >= cfg.min_margin_delta OR
              (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) THEN 0.0
    WHEN b.covers_delta < cfg.min_covers_delta THEN 0.0
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 0.0
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 0.0
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 0.0
    ELSE b.best_discount_pct
  END AS recommended_discount_pct,
  CASE
    WHEN NOT (b.margin_delta >= cfg.min_margin_delta OR
              (b.margin_base > 0 AND b.margin_delta / b.margin_base >= cfg.min_margin_delta_pct)) THEN 'No: margin lift too small'
    WHEN b.covers_delta < cfg.min_covers_delta THEN 'No: covers lift too small'
    WHEN b.covers_base >= cfg.low_demand_ratio * w.avg_covers_base THEN 'No: not a low-demand day'
    WHEN cfg.avoid_event_days = 1 AND b.event_home = 1 THEN 'No: home event day'
    WHEN cfg.avoid_weekends = 1  AND b.is_weekend  = 1 THEN 'No: weekend blocked'
    ELSE 'Yes: passes policy'
  END AS recommendation_reason,
  b.covers_base, b.covers_best, b.covers_delta,
  b.revenue_base, b.revenue_best, b.revenue_delta,
  b.margin_base,  b.margin_best,  b.margin_delta
FROM base b
JOIN weekday_avg w USING(dow)
CROSS JOIN cfg
ORDER BY b.date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 58, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [30]:
%%sql
SELECT recommendation_reason, COUNT(*) AS days
FROM gold_recommendations
GROUP BY recommendation_reason
ORDER BY days DESC;

SELECT AVG(best_discount_pct) AS avg_best, AVG(recommended_discount_pct) AS avg_recommended
FROM gold_recommendations;

SELECT * FROM gold_recommendations WHERE recommended_discount_pct>0 ORDER BY date LIMIT 10;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 61, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 2 fields>

<Spark SQL result set with 1 rows and 2 fields>

<Spark SQL result set with 10 rows and 18 fields>

Date attributes (handy for PBI visuals)

In [31]:
%%sql
CREATE OR REPLACE TABLE gold_dim_date AS
SELECT
  date,
  year(date)        AS yyyy,
  month(date)       AS mm,
  date_format(date,'MMM') AS mon,
  weekofyear(date)  AS wk,
  dayofweek(date)   AS dow_num,            -- 1=Sun..7=Sat (Spark)
  date_format(date,'EEE')  AS dow,
  CASE WHEN dayofweek(date) IN (1,7) THEN 1 ELSE 0 END AS is_weekend
FROM silver_dim_date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 62, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

Daily recommendations with context (single table)

In [32]:
%%sql
CREATE OR REPLACE TABLE gold_export_daily AS
SELECT
  r.date,
  r.recommended_discount_pct,
  r.best_discount_pct,
  r.recommendation_reason,
  r.covers_base, r.covers_best, r.covers_delta,
  r.revenue_base, r.revenue_best, r.revenue_delta,
  r.margin_base,  r.margin_best,  r.margin_delta,
  f.event_count_home, f.major_event_flag,
  f.holiday_flag, f.break_flag, f.finals_flag,
  f.is_closed, f.open_hours_total
FROM gold_recommendations r
LEFT JOIN gold_features_day f USING(date)
ORDER BY r.date;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 63, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

Curve for a slicer (discount → expected margin)

In [33]:
%%sql
CREATE OR REPLACE TABLE gold_export_curve AS
SELECT date, discount_pct,
       exp_covers, exp_revenue, exp_margin
FROM gold_robust_agg
ORDER BY date, discount_pct;


StatementMeta(, fffdbe37-8e50-4b9c-a8f7-53b9f5250630, 64, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>