In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType
from pyspark.sql.window import Window
import os
import traceback

#Widgets
dbutils.widgets.text("storage_account", "", "Storage account (uten .blob.core)")
dbutils.widgets.text("container", "", "Container")
dbutils.widgets.text("account_key", "", "Account key")
dbutils.widgets.text("schema", "trafikk", "Database/schema")
dbutils.widgets.dropdown("overwrite_mode", "true", ["true","false"], "Overwrite Silver/Gold")

dbutils.widgets.text("year", "", "Year (YYYY)")
dbutils.widgets.text("month", "", "Month (MM)")
dbutils.widgets.text("day", "", "Day (DD)")

storage = dbutils.widgets.get("storage_account") or os.environ.get("STORAGE_ACCOUNT", "")
container = dbutils.widgets.get("container") or os.environ.get("CONTAINER_NAME", "")
account_key = dbutils.widgets.get("account_key") or os.environ.get("ACCOUNT_KEY", "")
overwrite = (dbutils.widgets.get("overwrite_mode") == "true")
schema_db = (dbutils.widgets.get("schema") or "trafikk").strip()

year  = dbutils.widgets.get("year")
month = dbutils.widgets.get("month")
day   = dbutils.widgets.get("day")

if not (storage and container and account_key):
    raise ValueError("Mangler storage_account, container eller account_key.")

spark.conf.set(f"fs.azure.account.key.{storage}.blob.core.windows.net", account_key)

spark.conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.AzureLogStore")
spark.conf.set("fs.azure.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.conf.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
spark.conf.set("fs.azure.io.retry.max.retries", "20")
spark.conf.set("fs.azure.io.retry.backoff", "5s")
spark.conf.set("spark.databricks.delta.retryWriteConflict.enabled", "true")

# Base paths
bronze_base1 = f"wasbs://{container}@{storage}.blob.core.windows.net/bronze/traffic"
bronze_base2 = f"wasbs://{container}@{storage}.blob.core.windows.net/bronze/weather"
silver_base  = f"wasbs://{container}@{storage}.blob.core.windows.net/silver"
gold_base    = f"wasbs://{container}@{storage}.blob.core.windows.net/gold"

print("Using:")
print("  storage :", storage)
print("  container :", container)
print("  silver :", silver_base)
print("  gold :", gold_base)
print("  database :", schema_db)
print("  overwrite :", overwrite)
print("  Y/M/D :", year, month, day)

def _exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path); return True
    except: return False

def _is_delta(path: str) -> bool:
    return _exists(path + "/_delta_log")

def register_external_delta(db: str, table: str, path: str, force_replace: bool = False):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
    spark.sql(f"USE {db}")
    if not _is_delta(path):
        raise RuntimeError(f"Stien er ikke en Delta-tabell (mangler _delta_log): {path}")
    if force_replace:
        spark.sql(f"DROP TABLE IF EXISTS {db}.{table}")
        spark.sql(f"CREATE TABLE {db}.{table} USING DELTA LOCATION '{path}'")
    else:
        spark.sql(f"CREATE TABLE IF NOT EXISTS {db}.{table} USING DELTA LOCATION '{path}'")

def probe_delta_write(base_path: str):
    probe = f"{base_path.rstrip('/')}/_write_probe"
    print(f"[probe] Writing Delta probe -> {probe}")
    try:
        (spark.createDataFrame([(1,)], ["x"])
              .write.format("delta").mode("overwrite").save(probe))
        _ = spark.read.format("delta").load(probe).count()
        print("[probe] OK")
    except Exception as e:
        print("[probe] FAIL at:", probe)
        print(e); traceback.print_exc()
        raise

def write_delta_hardened(df, path: str, mode: str, overwrite_schema: bool = True, clean_on_conflict: bool = False):
    try:
        (df.write.format("delta").mode(mode)
           .option("overwriteSchema", "true" if overwrite_schema else "false")
           .save(path))
        return
    except Exception as e:
        msg = str(e)
        conflict = any(s in msg for s in [
            "TASK_WRITE_FAILED", "FileAlreadyExists", "rename", "CommitFailedException",
            "ConcurrentModificationException", "already exists"
        ])
        if clean_on_conflict and conflict and mode == "overwrite":
            print(f"[write_delta_hardened] Conflict at {path}. Removing path and retrying once...")
            try:
                dbutils.fs.rm(path, recurse=True)
            except Exception as e2:
                print("[write_delta_hardened] Failed to remove path:", e2)
            # retry once
            (df.write.format("delta").mode(mode)
               .option("overwriteSchema", "true" if overwrite_schema else "false")
               .save(path))
            return
        # If we get here, rethrow
        raise

# Ensure DB
spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_db}")
spark.sql(f"USE {schema_db}")
print("Active DB:", spark.catalog.currentDatabase())

probe_delta_write(silver_base)

cand_points = []
cand_vols = []
cand_rain = []

if year and month and day:
    cand_points.append(f"{bronze_base1}/registration_points/{year}/{month}/{day}/flat.parquet")
    cand_vols.append(f"{bronze_base1}/volumes/{year}/{month}/{day}/flat.parquet")
    cand_rain.append(f"{bronze_base2}/rain/{year}/{month}/{day}/rain.parquet")

cand_points.append(f"{bronze_base1}/registration_points/*/*/*/flat.parquet")
cand_vols.append(f"{bronze_base1}/volumes/*/*/*/flat.parquet")
cand_rain.append(f"{bronze_base2}/rain/*/*/*/rain.parquet")

def _safe_read_any(globs):
    for g in globs:
        try:
            df = spark.read.parquet(g)
            _ = df.limit(1).count()
            print(f"[OK] Loaded {g}")
            return df
        except Exception as e:
            print(f"[MISS] {g} -> {e}")
    return None

df_points_bronze = _safe_read_any(cand_points)
df_vol_bronze = _safe_read_any(cand_vols)
df_rain_bronze = _safe_read_any(cand_rain)

if df_points_bronze is None:
    raise RuntimeError("Fant ingen points-parquet i Bronze (år/mnd/dag).")

print("Bronze counts -> points:", df_points_bronze.count(),
      "| volumes:", (df_vol_bronze.count() if df_vol_bronze else 0),
      "| rain:", (df_rain_bronze.count() if df_rain_bronze else 0))


df_points_clean = (
    df_points_bronze
    .withColumn("id", F.col("id").cast(StringType()))
    .withColumn("name", F.col("name").cast(StringType()))
    .withColumn("lat", F.col("lat").cast(DoubleType()))
    .withColumn("lon", F.col("lon").cast(DoubleType()))
    .withColumn("partition_date", F.to_date("partition_date"))
    .dropna(subset=["id","lat","lon"])
)
try:
    w = Window.partitionBy("id").orderBy(F.col("partition_date").desc_nulls_last())
except Exception:
    w = Window.partitionBy("id").orderBy(F.desc("partition_date"))

df_points_silver = (
    df_points_clean.withColumn("rn", F.row_number().over(w))
                   .filter("rn = 1").drop("rn")
)

writer_mode = "overwrite" if overwrite else "append"

points_path = f"{silver_base}/traffic_points"
write_delta_hardened(df_points_silver, points_path, writer_mode, clean_on_conflict=True)
register_external_delta(schema_db, "traffic_points", points_path, force_replace=True)
print("Silver points rows:", spark.read.format("delta").load(points_path).count())

# Volumes
if df_vol_bronze is not None:
    df_vol_silver = (
        df_vol_bronze
        .withColumn("point_id", F.col("point_id").cast(StringType()))
        .withColumn("from_ts", F.to_timestamp("from"))
        .withColumn("to_ts", F.to_timestamp("to"))
        .withColumn("date", F.to_date("from_ts"))
        .withColumn("total_volume", F.col("total_volume").cast(IntegerType()))
        .select("point_id","date","from_ts","to_ts","total_volume")
        .dropna(subset=["point_id","date"])
    )
    vols_path = f"{silver_base}/traffic_volumes"
    write_delta_hardened(df_vol_silver, vols_path, writer_mode, clean_on_conflict=True)
    register_external_delta(schema_db, "traffic_volumes", vols_path, force_replace=overwrite)
else:
    spark.sql(f"""
      CREATE TABLE IF NOT EXISTS {schema_db}.traffic_volumes
      (point_id STRING, date DATE, from_ts TIMESTAMP, to_ts TIMESTAMP, total_volume INT)
      USING DELTA LOCATION '{silver_base}/traffic_volumes'
    """)

print("Silver volumes rows:",
      (spark.read.format("delta").load(f"{silver_base}/traffic_volumes").count()
       if _is_delta(f"{silver_base}/traffic_volumes") else 0))

if df_rain_bronze is not None:
    df_rain_silver = (
        df_rain_bronze
        .withColumn("point_id", F.col("point_id").cast(StringType()))
        .withColumn("date", F.to_date("date"))
        .withColumn("precip_mm", F.col("precip_mm").cast(DoubleType()))
        .dropna(subset=["point_id","date"])
    )
    rain_path = f"{silver_base}/rain_daily"
    write_delta_hardened(df_rain_silver, rain_path, writer_mode, clean_on_conflict=True)
    register_external_delta(schema_db, "rain_daily", rain_path, force_replace=overwrite)
else:
    spark.sql(f"""
      CREATE TABLE IF NOT EXISTS {schema_db}.rain_daily
      (point_id STRING, date DATE, precip_mm DOUBLE)
      USING DELTA LOCATION '{silver_base}/rain_daily'
    """)

print("Silver rain rows:",
      (spark.read.format("delta").load(f"{silver_base}/rain_daily").count()
       if _is_delta(f"{silver_base}/rain_daily") else 0))

# =========================
# 4) Gold
# =========================
sp = spark.read.format("delta").load(points_path)
sv = spark.read.format("delta").load(f"{silver_base}/traffic_volumes")

joined = (
    sv.join(sp, sv.point_id == sp.id, "left")
      .select(sv.point_id, sp.name.alias("point_name"), sp.lat, sp.lon,
              sv.date, sv.from_ts, sv.to_ts, sv.total_volume)
)

gold_daily = (
    joined.groupBy("point_id","point_name","lat","lon","date")
          .agg(F.sum("total_volume").alias("daily_volume"))
)
gd_path = f"{gold_base}/daily_volume_by_point"
write_delta_hardened(gold_daily, gd_path, "overwrite", clean_on_conflict=True)
register_external_delta(schema_db, "daily_volume_by_point", gd_path, force_replace=True)

gold_total_daily = joined.groupBy("date").agg(F.sum("total_volume").alias("total_daily_volume"))
gtd_path = f"{gold_base}/total_daily_volume"
write_delta_hardened(gold_total_daily, gtd_path, "overwrite", clean_on_conflict=True)
register_external_delta(schema_db, "total_daily_volume", gtd_path, force_replace=True)

sr = (spark.read.format("delta").load(f"{silver_base}/rain_daily")
      if _is_delta(f"{silver_base}/rain_daily")
      else spark.createDataFrame([], "point_id STRING, date DATE, precip_mm DOUBLE"))

gold_volume_vs_rain = (
    gold_daily.alias("gv")
    .join(sr.alias("r"), ["point_id","date"], "left")
    .select("gv.point_id","gv.point_name","gv.lat","gv.lon","gv.date","gv.daily_volume",
            F.col("r.precip_mm").alias("precip_mm"))
)
gvr_path = f"{gold_base}/volume_vs_rain_by_point"
write_delta_hardened(gold_volume_vs_rain, gvr_path, "overwrite", clean_on_conflict=True)
register_external_delta(schema_db, "volume_vs_rain_by_point", gvr_path, force_replace=True)

display(gold_daily.orderBy(F.col("date").desc(), F.col("daily_volume").desc()))
print("Gold ready -> tables: daily_volume_by_point, total_daily_volume, volume_vs_rain_by_point")

spark.sql(f"""
CREATE OR REPLACE TABLE {schema_db}.dim_date
USING DELTA AS
SELECT
  CAST(date_format(day, 'yyyyMMdd') AS INT) AS date_key,
  day AS date,
  year(day) AS year,
  month(day) AS month,
  day(day) AS day_of_month,
  weekofyear(day) AS week_of_year,
  date_format(day, 'E') AS weekday_name,
  CASE WHEN date_format(day, 'E') IN ('Sat','Sun') THEN true ELSE false END AS is_weekend
FROM (SELECT explode(sequence(to_date('2025-01-01'), current_date(), INTERVAL 1 DAY)) AS day) s
""")

spark.sql(f"""
CREATE OR REPLACE TABLE {schema_db}.dim_point
USING DELTA AS
SELECT id AS point_id, name AS point_name, lat, lon
FROM {schema_db}.traffic_points
""")

spark.sql(f"""
CREATE OR REPLACE TABLE {schema_db}.fact_traffic_day
USING DELTA AS
SELECT a.point_id,
       CAST(date_format(a.date,'yyyyMMdd') AS INT) AS date_key,
       a.date,
       a.daily_volume,
       COALESCE(r.precip_mm, 0) AS precip_mm
FROM {schema_db}.daily_volume_by_point a
LEFT JOIN {schema_db}.rain_daily r
  ON r.point_id = a.point_id AND r.date = a.date
""")

