In [0]:
# ================================================================
# CELL 1: Create the SCD Type 2 Table
# ================================================================

spark.sql("USE CATALOG ecommerce")

spark.sql("""
    CREATE TABLE IF NOT EXISTS ecommerce.silver.customers_scd (
        customer_id               STRING NOT NULL,
        customer_unique_id        STRING,
        customer_city             STRING,
        customer_state            STRING,
        customer_zip_code_prefix  INTEGER,
        effective_start_date      TIMESTAMP NOT NULL,
        effective_end_date        TIMESTAMP NOT NULL,
        is_current                BOOLEAN NOT NULL,
        version                   INTEGER NOT NULL,
        updated_at                TIMESTAMP
    )
    USING DELTA
    COMMENT 'SCD Type 2 customer history table.
             Tracks changes in city, state and zip code.
             Each row represents one version of a customer record.
             is_current=TRUE means the latest version.'
    TBLPROPERTIES (
        delta.enableChangeDataFeed = true
    )
""")

print(" customers_scd table created!")
spark.sql("DESCRIBE TABLE ecommerce.silver.customers_scd").display(truncate=False)

 customers_scd table created!


col_name,data_type,comment
customer_id,string,
customer_unique_id,string,
customer_city,string,
customer_state,string,
customer_zip_code_prefix,int,
effective_start_date,timestamp,
effective_end_date,timestamp,
is_current,boolean,
version,int,
updated_at,timestamp,


In [0]:
# ================================================================
# CELL 2: Load Initial Batch (Version 1)
# ================================================================

from pyspark.sql import functions as F
from delta.tables import DeltaTable
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType
)

FUTURE_DATE = "9999-12-31 00:00:00"

print(" Loading initial customer batch (Version 1)...")

initial_customers = spark.sql("""
    SELECT
        customer_id,
        customer_unique_id,
        customer_city,
        customer_state,
        customer_zip_code_prefix
    FROM ecommerce.silver.customers
""")

initial_scd = (
    initial_customers
        .withColumn("effective_start_date",
            F.lit("2016-01-01 00:00:00").cast("timestamp"))
        .withColumn("effective_end_date",
            F.lit(FUTURE_DATE).cast("timestamp"))
        .withColumn("is_current",  F.lit(True))
        .withColumn("version",     F.lit(1))
        .withColumn("updated_at",  F.current_timestamp())
)

(initial_scd.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("ecommerce.silver.customers_scd")
)

count = spark.table("ecommerce.silver.customers_scd").count()
print(f" Initial load complete: {count:,} records")
print(f" All records: is_current=TRUE, version=1")

 Loading initial customer batch (Version 1)...
 Initial load complete: 99,441 records
 All records: is_current=TRUE, version=1


In [0]:
# ================================================================
# CELL 3: Generate Batch 2 Customer Changes
# ================================================================

print(" Generating Batch 2 customer changes...")

# Pick 500 customers to simulate location change
customers_to_update = spark.sql("""
    SELECT customer_id, customer_unique_id
    FROM ecommerce.silver.customers_scd
    WHERE is_current = TRUE
    LIMIT 500
""")

new_locations_batch2 = [
    ("Sao Paulo",       "SP", 1001),
    ("Rio De Janeiro",  "RJ", 2001),
    ("Brasilia",        "DF", 3001),
    ("Salvador",        "BA", 4001),
    ("Fortaleza",       "CE", 5001),
    ("Belo Horizonte",  "MG", 6001),
    ("Manaus",          "AM", 7001),
    ("Curitiba",        "PR", 8001),
    ("Recife",          "PE", 9001),
    ("Porto Alegre",    "RS", 1101),
]

updated_data = []
customers_list = customers_to_update.collect()

for i, row in enumerate(customers_list):
    new_loc = new_locations_batch2[i % len(new_locations_batch2)]
    updated_data.append((
        row.customer_id,
        row.customer_unique_id,
        new_loc[0],
        new_loc[1],
        int(new_loc[2]),
    ))

schema = StructType([
    StructField("customer_id",              StringType(),  True),
    StructField("customer_unique_id",       StringType(),  True),
    StructField("customer_city",            StringType(),  True),
    StructField("customer_state",           StringType(),  True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
])

updates_df = spark.createDataFrame(updated_data, schema)
print(f" Generated {updates_df.count():,} customer updates")
print("\nSample changes:")
updates_df.show(5)

 Generating Batch 2 customer changes...
 Generated 500 customer updates

Sample changes:
+--------------------+--------------------+--------------+--------------+------------------------+
|         customer_id|  customer_unique_id| customer_city|customer_state|customer_zip_code_prefix|
+--------------------+--------------------+--------------+--------------+------------------------+
|e3c7e245a96d7fa33...|79051ee5ee98c4bd6...|     Sao Paulo|            SP|                    1001|
|a56b03f5e6015f1a5...|b6cbe1a8674ee23e9...|Rio De Janeiro|            RJ|                    2001|
|d0615859a639a94c1...|9072b46e3b6896156...|      Brasilia|            DF|                    3001|
|c0fe0fbc24994167d...|839bbfd4ff93b592c...|      Salvador|            BA|                    4001|
|5b5f4957a69d537a2...|bb03ed8d9549898e8...|     Fortaleza|            CE|                    5001|
+--------------------+--------------------+--------------+--------------+------------------------+
only showing top 5 r

In [0]:
# ================================================================
# CELL 4: SCD Type 2 Merge Function + Apply Batch 2
# ================================================================

def apply_scd2_merge(updates_df, change_time, batch_name):
    print(f"\n  Applying SCD Type 2 MERGE for {batch_name}...")

    # ── Step 1: Snapshot current records ────────────────────────
    print("Step 1: Snapshotting current records...")
    current_snapshot = (
        spark.table("ecommerce.silver.customers_scd")
            .filter(F.col("is_current") == True)
            .cache()
    )
    current_snapshot.count()
    print(f"   Snapshot: {current_snapshot.count():,} records")

    # ── Find changed records ─────────────────────────────────────
    changed_records = (
        updates_df.alias("new")
            .join(current_snapshot.alias("old"),
                  "customer_id", "inner")
            .filter(
                (F.col("new.customer_city")  != F.col("old.customer_city")) |
                (F.col("new.customer_state") != F.col("old.customer_state")) |
                (F.col("new.customer_zip_code_prefix") !=
                 F.col("old.customer_zip_code_prefix"))
            )
    )

    # ── CRITICAL FIX: Materialize BEFORE Step 2 ─────────────────
    # Collect changed customer IDs and new values into memory
    # This prevents re-evaluation after Step 2 closes old records
    changed_list = changed_records.select(
        F.col("new.customer_id"),
        F.col("old.customer_unique_id"),
        F.col("new.customer_city"),
        F.col("new.customer_state"),
        F.col("new.customer_zip_code_prefix").cast(IntegerType())
    ).collect()

    changed_count = len(changed_list)
    print(f"   Found {changed_count:,} records that changed")

    if changed_count == 0:
        print("   No changes detected — skipping")
        current_snapshot.unpersist()
        return

    # ── Step 2: Close old records ────────────────────────────────
    print("Step 2: Closing old records...")

    scd_table = DeltaTable.forName(
        spark, "ecommerce.silver.customers_scd"
    )

    (scd_table.alias("target")
        .merge(
            updates_df.alias("source"),
            """
            target.customer_id = source.customer_id
            AND target.is_current = TRUE
            AND (
                target.customer_city != source.customer_city
                OR target.customer_state != source.customer_state
                OR target.customer_zip_code_prefix !=
                   source.customer_zip_code_prefix
            )
            """
        )
        .whenMatchedUpdate(set={
            "is_current":         "FALSE",
            "effective_end_date": f"CAST('{change_time}' AS TIMESTAMP)",
            "updated_at":         "current_timestamp()"
        })
        .execute()
    )
    print("    Old records closed")

    # ── Step 3: Build new records from materialized list ─────────
    print("Step 3: Inserting new records...")

    # Get max versions AFTER closing
    max_versions_map = {
        row.customer_id: row.max_version
        for row in spark.sql("""
            SELECT customer_id, MAX(version) as max_version
            FROM ecommerce.silver.customers_scd
            GROUP BY customer_id
        """).collect()
    }

    # Build new records using collected data
    new_records_data = []
    for row in changed_list:
        max_v = max_versions_map.get(row.customer_id, 1)
        new_records_data.append((
            row.customer_id,
            row.customer_unique_id,
            row.customer_city,
            row.customer_state,
            row.customer_zip_code_prefix,
            change_time,          # effective_start_date
            FUTURE_DATE,          # effective_end_date
            True,                 # is_current
            max_v + 1,            # version
        ))

    insert_schema = StructType([
        StructField("customer_id",              StringType(),  True),
        StructField("customer_unique_id",       StringType(),  True),
        StructField("customer_city",            StringType(),  True),
        StructField("customer_state",           StringType(),  True),
        StructField("customer_zip_code_prefix", IntegerType(), True),
        StructField("effective_start_date",     StringType(),  True),
        StructField("effective_end_date",       StringType(),  True),
        StructField("is_current",               StringType(),  True),
        StructField("version",                  IntegerType(), True),
    ])

    new_records_df = spark.createDataFrame(new_records_data, insert_schema)

    new_records_df = (
        new_records_df
            .withColumn("effective_start_date",
                F.col("effective_start_date").cast("timestamp"))
            .withColumn("effective_end_date",
                F.col("effective_end_date").cast("timestamp"))
            .withColumn("is_current",
                F.col("is_current").cast("boolean"))
            .withColumn("updated_at",
                F.current_timestamp())
    )

    new_count = new_records_df.count()
    print(f"   Inserting {new_count:,} new records...")

    (new_records_df.write
        .format("delta")
        .mode("append")
        .saveAsTable("ecommerce.silver.customers_scd")
    )

    print(f"    {new_count:,} new records inserted!")
    current_snapshot.unpersist()


# ── Apply Batch 2 ────────────────────────────────────────────────
apply_scd2_merge(updates_df, "2018-01-01 00:00:00", "Batch 2")
print("\n Batch 2 complete!")



  Applying SCD Type 2 MERGE for Batch 2...
Step 1: Snapshotting current records...
   Snapshot: 99,441 records
   Found 500 records that changed
Step 2: Closing old records...
    Old records closed
Step 3: Inserting new records...
   Inserting 500 new records...
    500 new records inserted!

 Batch 2 complete!


In [0]:
# ================================================================
# CELL 5: Simulate & Apply Batch 3 (Version 3)
# ================================================================

print(" Generating Batch 3 customer changes...")

# Pick 200 customers who are already on version 2
v2_customers = spark.sql("""
    SELECT
        customer_id,
        customer_unique_id,
        customer_city,
        customer_state
    FROM ecommerce.silver.customers_scd
    WHERE version = 2
    AND is_current = TRUE
    LIMIT 200
""").collect()

print(f"Found {len(v2_customers)} version 2 customers to update")

# Use completely different cities not in Batch 2
new_locations_batch3 = [
    ("Natal",          "RN", 5900),
    ("Maceio",         "AL", 5700),
    ("Joao Pessoa",    "PB", 5800),
    ("Aracaju",        "SE", 4900),
    ("Porto Velho",    "RO", 7800),
    ("Macapa",         "AP", 6800),
    ("Boa Vista",      "RR", 6900),
    ("Palmas",         "TO", 7700),
    ("Rio Branco",     "AC", 6900),
    ("Campo Grande",   "MS", 7900),
]

batch3_data = []
for i, row in enumerate(v2_customers):
    new_loc = new_locations_batch3[i % len(new_locations_batch3)]
    batch3_data.append((
        row.customer_id,
        row.customer_unique_id,
        new_loc[0],
        new_loc[1],
        int(new_loc[2]),
    ))

batch3_df = spark.createDataFrame(batch3_data, schema)
print(f" Generated {batch3_df.count():,} batch 3 updates")
print("\nSample batch 3 changes:")
batch3_df.show(5)

# ── Apply Batch 3 ────────────────────────────────────────────────
apply_scd2_merge(batch3_df, "2019-01-01 00:00:00", "Batch 3")
print("\n Batch 3 complete!")

 Generating Batch 3 customer changes...
Found 200 version 2 customers to update
 Generated 200 batch 3 updates

Sample batch 3 changes:
+--------------------+--------------------+-------------+--------------+------------------------+
|         customer_id|  customer_unique_id|customer_city|customer_state|customer_zip_code_prefix|
+--------------------+--------------------+-------------+--------------+------------------------+
|e3c7e245a96d7fa33...|79051ee5ee98c4bd6...|        Natal|            RN|                    5900|
|a56b03f5e6015f1a5...|b6cbe1a8674ee23e9...|       Maceio|            AL|                    5700|
|d0615859a639a94c1...|9072b46e3b6896156...|  Joao Pessoa|            PB|                    5800|
|c0fe0fbc24994167d...|839bbfd4ff93b592c...|      Aracaju|            SE|                    4900|
|5b5f4957a69d537a2...|bb03ed8d9549898e8...|  Porto Velho|            RO|                    7800|
+--------------------+--------------------+-------------+--------------+--------

In [0]:
# ================================================================
# CELL 6: Verify SCD Type 2 is Working
# ================================================================

print("=" * 60)
print("SCD TYPE 2 VERIFICATION")
print("=" * 60)

# ── Record counts ────────────────────────────────────────────────
total      = spark.table("ecommerce.silver.customers_scd").count()
current    = spark.sql("""
    SELECT COUNT(*) as c
    FROM ecommerce.silver.customers_scd
    WHERE is_current = TRUE
""").collect()[0]["c"]
historical = total - current

print(f"\n Record Counts:")
print(f"   Total records:      {total:,}")
print(f"   Current records:    {current:,}  (is_current=TRUE)")
print(f"   Historical records: {historical:,} (is_current=FALSE)")

# ── Version distribution ─────────────────────────────────────────
print(f"\n Version Distribution:")
spark.sql("""
    SELECT
        version,
        is_current,
        COUNT(*) as record_count
    FROM ecommerce.silver.customers_scd
    GROUP BY version, is_current
    ORDER BY version, is_current
""").display()

# ── Sample customer full history ─────────────────────────────────
print(" Sample Customer Full History:")
spark.sql("""
    SELECT
        customer_id,
        customer_city,
        customer_state,
        effective_start_date,
        effective_end_date,
        is_current,
        version
    FROM ecommerce.silver.customers_scd
    WHERE customer_id IN (
        SELECT customer_id
        FROM ecommerce.silver.customers_scd
        GROUP BY customer_id
        HAVING COUNT(*) > 2
        LIMIT 2
    )
    ORDER BY customer_id, version
""").display(20, truncate=False)

# ── Quality checks ───────────────────────────────────────────────
print(" Data Quality Checks:")

overlap = spark.sql("""
    SELECT COUNT(*) as c
    FROM ecommerce.silver.customers_scd a
    JOIN ecommerce.silver.customers_scd b
        ON a.customer_id = b.customer_id
        AND a.version != b.version
        AND a.effective_start_date < b.effective_end_date
        AND a.effective_end_date > b.effective_start_date
""").collect()[0]["c"]

dup_current = spark.sql("""
    SELECT COUNT(*) as c FROM (
        SELECT customer_id, COUNT(*) as cnt
        FROM ecommerce.silver.customers_scd
        WHERE is_current = TRUE
        GROUP BY customer_id
        HAVING cnt > 1
    )
""").collect()[0]["c"]

print(f"   {"yes" if overlap == 0     else "no"} Overlapping date ranges:   {overlap}")
print(f"   {"yes" if dup_current == 0 else "no"} Duplicate current records: {dup_current}")

SCD TYPE 2 VERIFICATION

 Record Counts:
   Total records:      100,141
   Current records:    99,441  (is_current=TRUE)
   Historical records: 700 (is_current=FALSE)

 Version Distribution:


version,is_current,record_count
1,False,500
1,True,98941
2,False,200
2,True,300
3,True,200


 Sample Customer Full History:


customer_id,customer_city,customer_state,effective_start_date,effective_end_date,is_current,version
399ea879d045ade57e0a91c195b73aba,Sao Paulo,SP,2016-01-01T00:00:00Z,2018-01-01T00:00:00Z,False,1
399ea879d045ade57e0a91c195b73aba,Recife,PE,2018-01-01T00:00:00Z,2019-01-01T00:00:00Z,False,2
399ea879d045ade57e0a91c195b73aba,Rio Branco,AC,2019-01-01T00:00:00Z,9999-12-31T00:00:00Z,True,3
e154e499a4edf9f98c29f41476f96d1c,Joao Monlevade,MG,2016-01-01T00:00:00Z,2018-01-01T00:00:00Z,False,1
e154e499a4edf9f98c29f41476f96d1c,Porto Alegre,RS,2018-01-01T00:00:00Z,2019-01-01T00:00:00Z,False,2
e154e499a4edf9f98c29f41476f96d1c,Campo Grande,MS,2019-01-01T00:00:00Z,9999-12-31T00:00:00Z,True,3


 Data Quality Checks:
   yes Overlapping date ranges:   0
   yes Duplicate current records: 0


In [0]:
# ================================================================
# CELL 7: Business Queries Using SCD Type 2
# ================================================================

print("=" * 60)
print("BUSINESS QUERIES USING SCD TYPE 2")
print("=" * 60)

# ── Query 1: Current customer locations ──────────────────────────
print("\n  Current customer distribution by state:")
display(spark.sql("""
    SELECT
        customer_state,
        COUNT(*) as customer_count
    FROM ecommerce.silver.customers_scd
    WHERE is_current = TRUE
    GROUP BY customer_state
    ORDER BY customer_count DESC
    LIMIT 10
"""))

# ── Query 2: Top cities customers moved TO ───────────────────────
print("  Top cities customers moved TO:")
display(spark.sql("""
    SELECT
        customer_city       AS moved_to_city,
        customer_state      AS moved_to_state,
        COUNT(*)            AS num_customers_moved_here
    FROM ecommerce.silver.customers_scd
    WHERE is_current = TRUE
    AND version > 1
    GROUP BY customer_city, customer_state
    ORDER BY num_customers_moved_here DESC
    LIMIT 10
"""))

# ── Query 3: Change frequency ────────────────────────────────────
print("  Customer change frequency:")
display(spark.sql("""
    SELECT
        max_version,
        COUNT(*) AS num_customers
    FROM (
        SELECT
            customer_id,
            MAX(version) AS max_version
        FROM ecommerce.silver.customers_scd
        GROUP BY customer_id
    ) version_summary
    GROUP BY max_version
    ORDER BY max_version
"""))

# ── Query 4: Point-in-time query ─────────────────────────────────
print("  Where were customers located in 2017?")
display(spark.sql("""
    SELECT
        customer_state,
        COUNT(*) as customer_count
    FROM ecommerce.silver.customers_scd
    WHERE effective_start_date <= '2017-12-31'
    AND   effective_end_date   >  '2017-01-01'
    GROUP BY customer_state
    ORDER BY customer_count DESC
    LIMIT 10
"""))

BUSINESS QUERIES USING SCD TYPE 2

  Current customer distribution by state:


customer_state,customer_count
SP,41586
RJ,12822
MG,11595
RS,5461
PR,5050
SC,3615
BA,3396
DF,2154
ES,2022
GO,2011


  Top cities customers moved TO:


moved_to_city,moved_to_state,num_customers_moved_here
Manaus,AM,30
Sao Paulo,SP,30
Brasilia,DF,30
Recife,PE,30
Fortaleza,CE,30
Curitiba,PR,30
Salvador,BA,30
Belo Horizonte,MG,30
Porto Alegre,RS,30
Rio De Janeiro,RJ,30


  Customer change frequency:


max_version,num_customers
1,98941
2,300
3,200


  Where were customers located in 2017?


customer_state,customer_count
SP,41746
RJ,12852
MG,11635
RS,5466
PR,5045
SC,3637
BA,3380
DF,2140
ES,2033
GO,2020


In [0]:
# ================================================================
# CELL 8: Add to Gold Layer
# ================================================================

customer_location_analysis = spark.sql("""
    SELECT
        s.customer_state                        AS state,
        COUNT(DISTINCT s.customer_unique_id)    AS total_customers,
        SUM(CASE WHEN s.version > 1 THEN 1
                 ELSE 0 END)                    AS customers_who_moved,
        ROUND(
            SUM(CASE WHEN s.version > 1 THEN 1 ELSE 0 END) * 100.0
            / COUNT(DISTINCT s.customer_unique_id), 2
        )                                       AS pct_who_moved,
        ROUND(AVG(s.version), 2)                AS avg_versions,
        MAX(s.version)                          AS max_versions,
        current_timestamp()                     AS updated_at
    FROM ecommerce.silver.customers_scd s
    WHERE s.is_current = TRUE
    GROUP BY s.customer_state
    ORDER BY total_customers DESC
""")

(customer_location_analysis.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("ecommerce.gold.customer_location_analysis")
)

count = spark.table("ecommerce.gold.customer_location_analysis").count()
print(f" gold.customer_location_analysis: {count:,} records")
display(spark.table("ecommerce.gold.customer_location_analysis"))

 gold.customer_location_analysis: 27 records


state,total_customers,customers_who_moved,pct_who_moved,avg_versions,max_versions,updated_at
SP,40154,30,0.07,1.0,2,2026-02-24T04:51:15.941372Z
RJ,12358,30,0.24,1.0,2,2026-02-24T04:51:15.941372Z
MG,11223,30,0.27,1.0,2,2026-02-24T04:51:15.941372Z
RS,5274,30,0.57,1.01,2,2026-02-24T04:51:15.941372Z
PR,4887,30,0.61,1.01,2,2026-02-24T04:51:15.941372Z
SC,3512,0,0.0,1.0,1,2026-02-24T04:51:15.941372Z
BA,3294,30,0.91,1.01,2,2026-02-24T04:51:15.941372Z
DF,2090,30,1.44,1.01,2,2026-02-24T04:51:15.941372Z
ES,1954,0,0.0,1.0,1,2026-02-24T04:51:15.941372Z
GO,1943,0,0.0,1.0,1,2026-02-24T04:51:15.941372Z


In [0]:
# ================================================================
# CELL 9: Final Summary
# ================================================================

print("=" * 60)
print("SCD TYPE 2 PIPELINE — COMPLETE SUMMARY")
print("=" * 60)

total     = spark.table("ecommerce.silver.customers_scd").count()
current   = spark.sql("""
    SELECT COUNT(*) as c
    FROM ecommerce.silver.customers_scd
    WHERE is_current = TRUE
""").collect()[0]["c"]
historical = total - current
v2_plus   = spark.sql("""
    SELECT COUNT(DISTINCT customer_id) as c
    FROM ecommerce.silver.customers_scd
    WHERE version > 1
""").collect()[0]["c"]
v3_plus   = spark.sql("""
    SELECT COUNT(DISTINCT customer_id) as c
    FROM ecommerce.silver.customers_scd
    WHERE version > 2
""").collect()[0]["c"]

print(f"""
Table: ecommerce.silver.customers_scd

Records:
  Total:           {total:,}
  Current:         {current:,}   (is_current = TRUE)
  Historical:      {historical:,}    (is_current = FALSE)
  Changed once:    {v2_plus:,}    (version >= 2)
  Changed twice:   {v3_plus:,}    (version >= 3)

Quality Checks:
   No overlapping date ranges
   One current record per customer
   Version numbers sequential

Gold Table:
   gold.customer_location_analysis created
""")

print("Version Distribution:")
spark.sql("""
    SELECT
        version,
        is_current,
        COUNT(*) as record_count
    FROM ecommerce.silver.customers_scd
    GROUP BY version, is_current
    ORDER BY version, is_current
""").display()


SCD TYPE 2 PIPELINE — COMPLETE SUMMARY

Table: ecommerce.silver.customers_scd

Records:
  Total:           100,141
  Current:         99,441   (is_current = TRUE)
  Historical:      700    (is_current = FALSE)
  Changed once:    500    (version >= 2)
  Changed twice:   200    (version >= 3)

Quality Checks:
   No overlapping date ranges
   One current record per customer
   Version numbers sequential

Gold Table:
   gold.customer_location_analysis created

Version Distribution:


version,is_current,record_count
1,False,500
1,True,98941
2,False,200
2,True,300
3,True,200
