In [1]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

df_src_raw = spark.table("silver_customers")

window_latest = Window.partitionBy("customer_id") \
                      .orderBy(f.col("updated_ts").desc())

df_src_latest = (
    df_src_raw
    .withColumn("rn", f.row_number().over(window_latest))
    .filter(f.col("rn") == 1)
    .drop("rn")
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 3, Finished, Available, Finished)

In [2]:
df_dim_current = (
    spark.table("dim_customer")
    .filter(f.col("is_current") == 1)
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 4, Finished, Available, Finished)

In [3]:
src = df_src_latest.alias("src")
dim = df_dim_current.alias("dim")

df_joined = src.join(
    dim,
    on="customer_id",
    how="left"
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 5, Finished, Available, Finished)

In [4]:
df_changes = df_joined.filter(
    (f.col("dim.customer_id").isNull()) |
    (f.col("src.risk_rating") != f.col("dim.risk_rating"))
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 6, Finished, Available, Finished)

In [5]:
df_expire = (
    df_changes
    .filter(f.col("dim.customer_id").isNotNull())
    .select(
        f.col("dim.customer_sk").alias("customer_sk"),
        f.to_date(f.col("src.updated_ts")).alias("effective_enddate")
    )
    .withColumn("is_current", f.lit(0))
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 7, Finished, Available, Finished)

In [6]:
df_expire.show()

StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 8, Finished, Available, Finished)

+-----------+-----------------+----------+
|customer_sk|effective_enddate|is_current|
+-----------+-----------------+----------+
+-----------+-----------------+----------+



In [7]:
from delta.tables import DeltaTable

dim_table = DeltaTable.forName(spark, "dim_customer")

dim_table.alias("dim").merge(
    df_expire.alias("exp"),
    "dim.customer_sk = exp.customer_sk"
).whenMatchedUpdate(
    set={
        "effective_enddate": "exp.effective_enddate",
        "is_current": "exp.is_current"
    }
).execute()


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 9, Finished, Available, Finished)

In [8]:
df_insert = (
    df_changes
    .select("src.*")
    .withColumn("customer_sk", f.monotonically_increasing_id())
    .withColumn("effective_startdate", f.to_date("updated_ts"))
    .withColumn("effective_enddate", f.lit("9999-12-31").cast("date"))
    .withColumn("is_current", f.lit(1))
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 10, Finished, Available, Finished)

In [9]:
(
    df_insert
    .write
    .mode("append")
    .format("delta")
    .saveAsTable("dim_customer")
)


StatementMeta(, 385b08c8-3536-42df-942d-3b1594f19863, 11, Finished, Available, Finished)