In [0]:
# COMMAND ----------  (cell 1) Widgets & helpers
dbutils.widgets.text("start_date", "")
dbutils.widgets.text("end_date",   "")
start = dbutils.widgets.get("start_date") or None
end   = dbutils.widgets.get("end_date")   or None

import pyspark.sql.functions as F
from datetime import date

# COMMAND ----------  (cell 2) Read Bronze, de‑dup, optional date filter
bronze_raw = spark.read.table("weather_bronze.hourly")

# drop exact duplicates (same ts + lat/lon)
bronze = bronze_raw.dropDuplicates(
    ["timestamp_utc", "location_lat", "location_lon"]
)

# optional backfill window
if start or end:
    bronze = bronze.filter(
        (F.col("timestamp_utc") >= F.lit(start) if start else F.lit("1900-01-01")) &
        (F.col("timestamp_utc") <  F.lit(end)   if end   else F.lit("2999-12-31"))
    )

# daily roll‑up
df_daily = (
    bronze
    .withColumn("date", F.to_date("timestamp_utc"))
    .groupBy("date", "location_lat", "location_lon")
    .agg(
        F.count("*").alias("row_count"),
        F.avg("temp_c").alias("avg_temp_c"),
        F.max("wind_speed_kmh").alias("max_wind_kmh"),
        F.min("humidity_pct").alias("min_humidity_pct"),
    )
    .withColumn("process_ts", F.current_timestamp())
)

# COMMAND ----------  (cell 3) Data‑quality expectations
# Rule 1: coverage – finished days must have 24 rows; today can be ≤ 24
df_daily = df_daily.withColumn(
    "expect_row_count_ok",
    F.when(
        F.col("date") < F.current_date(),
        F.col("row_count") == 24
    ).otherwise(F.col("row_count") <= 24)
)

# Rule 2: temperature range
df_daily = df_daily.withColumn(
    "expect_temp_ok", F.col("avg_temp_c").between(-60, 60)
)

# Rule 3: humidity range
df_daily = df_daily.withColumn(
    "expect_humidity_ok", F.col("min_humidity_pct").between(0, 100)
)

# Aggregate flag
dq_cols = [c for c in df_daily.columns if c.startswith("expect_")]
df_daily = df_daily.withColumn(
    "dq_passed",
    F.expr(" and ".join(c for c in dq_cols))
)

# COMMAND ----------  (cell 4) Upsert into Delta Silver (1 row per date + location)
from delta.tables import DeltaTable

spark.sql("CREATE DATABASE IF NOT EXISTS weather_silver")
target = "weather_silver.daily"

if not spark.catalog.tableExists(target):
    # --- first run: simple write ---
    (df_daily.write
        .format("delta")
        .partitionBy("date")
        .saveAsTable(target))
else:
    # --- subsequent runs: MERGE (upsert) ---
    tgt = DeltaTable.forName(spark, target)

    (tgt.alias("t")
        .merge(
            source=df_daily.alias("s"),
            condition=("""
                t.date          = s.date          AND
                t.location_lat  = s.location_lat  AND
                t.location_lon  = s.location_lon
            """)
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

# COMMAND ----------  (cell 5) Fail job if any DQ errors (so alerting fires)
bad = df_daily.filter(~F.col("dq_passed"))

# bad.select(
#     "date", "row_count", "avg_temp_c",
#     "expect_row_count_ok", "expect_temp_ok", "expect_humidity_ok"
# ).show(truncate=False)

if bad.count() > 0:
    raise ValueError(f"Data‑quality failed for {bad.count()} day(s)")
