In [0]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

In [0]:
import datetime as _dt
try:
    arrival_date = dbutils.widgets.get("arrival_date")
except Exception:
    arrival_date = _dt.date.today().strftime("%Y-%m-%d")
try:
    catalog = dbutils.widgets.get("catalog")
except Exception:
    catalog = "travel_bookings"
try:
    schema = dbutils.widgets.get("schema")
except Exception:
    schema = "default"

In [0]:
src = spark.table(f"{catalog}.bronze.customer_inc").where(F.col("business_date") == F.to_date(F.lit(arrival_date)))
dim_full_name = f"{catalog}.{schema}.customer_dim"


In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {dim_full_name} (
  customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,
  customer_id INT,
  customer_name STRING,
  customer_address STRING,
  email STRING,
  valid_from DATE,
  valid_to DATE,
  is_current BOOLEAN
) USING DELTA
""")


In [0]:
if spark.catalog.tableExists(dim_full_name):
  dim = DeltaTable.forName(spark, dim_full_name)
  (dim.alias("d").merge(
      src.alias("s"),
      "d.customer_id = s.customer_id AND d.is_current = true"
    )
    .whenMatchedUpdate(
      condition="d.customer_name <> s.customer_name OR d.customer_address <> s.customer_address OR d.email <> s.email",
      set={"valid_to": "s.valid_from", "is_current": "false"}
    )
    .whenNotMatchedInsert(values={
      "customer_id": "s.customer_id",
      "customer_name": "s.customer_name",
      "customer_address": "s.customer_address",
      "email": "s.email",
      "valid_from": "s.valid_from",
      "valid_to": "s.valid_to",
      "is_current": "true"
    })
    .execute())

  
  changed = spark.sql(f"""
    SELECT s.customer_id, s.customer_name, s.customer_address, s.email, s.valid_from, s.valid_to, true AS is_current
    FROM {catalog}.bronze.customer_inc s
    LEFT JOIN {dim_full_name} d
      ON d.customer_id = s.customer_id AND d.valid_from = s.valid_from AND d.is_current = true
    WHERE s.business_date = DATE('{arrival_date}') AND d.customer_id IS NULL
  """)
  if changed.count() > 0:
    changed.write.mode("append").format("delta").saveAsTable(dim_full_name)
else:
  init = src.select("customer_id","customer_name","customer_address","email","valid_from","valid_to","is_current")
  init.write.mode("append").format("delta").saveAsTable(dim_full_name)

print("SCD2 (dimension) merge complete")
