In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
src_df = spark.createDataFrame( [
        (1, "Alice", "London"),
        (2, "Bob",   "Paris"),
        (3, "Carol", "Berlin"),   # New customer
    ],
    ["cust_id", "name", "city"])

In [0]:
load_ts = F.current_timestamp()
src_df = (
    src_df
    .withColumn("valid_from", load_ts)
    .withColumn("is_current", F.lit(True))
    .withColumn("valid_to", F.lit("9999-12-31").cast("timestamp"))
    .withColumn("load_ts", load_ts)
)

In [0]:
src_df.display()

In [0]:
%python
dim_path = "/Volumes/workspace/delta/customer_scd2"

if not spark.catalog.tableExists("customer_scd2"):
    (
        src_df.write
        .format("delta")
        .option("overwriteSchema", "true")
        .mode("overwrite")
        .save(dim_path)
    )

spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS customer_scd2
    USING DELTA
    LOCATION '{dim_path}'
    """
)

In [0]:
# Reload as DeltaTable + DataFrame
dim_tbl = DeltaTable.forPath(spark, dim_path)
dim_df  = dim_tbl.toDF()

In [0]:
joined_df =src_df.alias("src").join(dim_df.alias("tgt"), on="cust_id", how="left")

In [0]:
changed = joined_df.filter("tgt.cust_id IS NOT NULL and tgt.is_current = true")\
.filter("src.name != tgt.name or src.city != tgt.city")\
.select("src.*","tgt.valid_from")

In [0]:
new_rows = joined_df.filter("tgt.cust_id is NULL")\
    .select("src.*")

In [0]:
dim_tbl.alias("tgt").update(
    condition=(
        (F.col("tgt.cust_id").isin([row.cust_id for row in changed.select("cust_id").distinct().collect()])) &
        (F.col("tgt.is_current") == True)
    ),
    set={
        "is_current": F.lit(False),
        "valid_to":   load_ts
    }
)

In [0]:
# ---------- 5. Insert new versions (changed + brand new) ----------
upserts_df = (changed.select("cust_id", "name", "city", "load_ts", "valid_from", "valid_to", "is_current")
              .unionByName(new_rows.select("cust_id", "name", "city", "load_ts", "valid_from", "valid_to", "is_current")))

(upserts_df
 .write
 .format("delta")
 .mode("append")
 .save(dim_path))