In [0]:
#spark.version              # Spark version
dbutils.fs.ls("/databricks-datasets")  # sanity check storage


In [0]:
from pyspark.sql import functions as F, types as T

# Base (current) dimension table
dim_df = spark.createDataFrame([
    (1, "EC100", "MY", "Alice",  "2025-09-01"),
    (2, "EC200", "SG", "Bob",    "2025-09-01"),
    (3, "EC300", "IN", "Carlos", "2025-09-01"),
], schema="id INT, entry_code STRING, country STRING, name STRING, effective_date STRING") \
.withColumn("effective_date", F.to_date("effective_date"))

# Incoming changes (new snapshot)
stg_df = spark.createDataFrame([
    (1, "EC100", "MY", "Alice",     "2025-09-15"),  # unchanged name
    (2, "EC200", "SG", "Bobby",     "2025-09-15"),  # name changed
    (4, "EC400", "TH", "Diana",     "2025-09-15"),  # brand new record
], schema="id INT, entry_code STRING, country STRING, name STRING, effective_date STRING") \
.withColumn("effective_date", F.to_date("effective_date"))


In [0]:

# Write as Delta
dim_path = "/tmp/demo/dim_customers"
stg_path = "/tmp/demo/stg_customers"


In [0]:

dim_df.write.mode("overwrite").format("delta").save(dim_path)
stg_df.write.mode("overwrite").format("delta").save(stg_path)

spark.sql(f"DROP TABLE IF EXISTS demo_dim_customers")
spark.sql(f"DROP TABLE IF EXISTS demo_stg_customers")
spark.sql(f"CREATE TABLE demo_dim_customers USING DELTA LOCATION '{dim_path}'")
spark.sql(f"CREATE TABLE demo_stg_customers USING DELTA LOCATION '{stg_path}'")

spark.table("demo_dim_customers").show()
spark.table("demo_stg_customers").show()
