In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("SCD2-Incremental").getOrCreate()

# Existing SCD2 Table
existing_data = [
    ("07","G","Pune","2025-07-01", False),
    ("07","G","Mumbai","2025-07-15", True),   # currently active
]
columns = ["ID","NAME","ADDR","DATE","Active"]
df_existing = spark.createDataFrame(existing_data, columns)

print("Current SCD2 Table:")
df_existing.show()

# New incoming data
new_data = [("07","G","Goa","2025-07-25"),("08","H","Pune","2025-08-25")]
columns_new = ["ID","NAME","ADDR","DATE"]
df_new = spark.createDataFrame(new_data, columns_new)

print("New Data:")
df_new.show()


In [0]:
# Step 1: Get only ACTIVE records from existing
df_active = df_existing.filter(col("Active") == True)
df_active.show()

# Step 2: Join new data with active records
df_joined = df_new.alias("n").join(
    df_active.alias("e"), on="ID", how="left"
)
df_joined.show()

# Step 3: Compare if change is needed
df_changes = df_joined.filter(
    (col("e.ID").isNull()) |                # New ID
    ( (col("n.NAME") != col("e.NAME")) | 
      (col("n.ADDR") != col("e.ADDR")) |
      (col("n.DATE") != col("e.DATE")) )   # Changed data
)
df_changes.show()

In [0]:

# Step 4: If changed, expire old and insert new
updates = []
for row in df_changes.collect():
    id_, name, addr, date = row["ID"], row["NAME"], row["ADDR"], row["DATE"]
    
    # expire old record if exists
    df_existing = df_existing.withColumn(
        "Active",
        when((col("ID")==id_) & (col("Active")==True), lit(False)).otherwise(col("Active"))
    )
    
    # insert new record
    updates.append((id_, name, addr, date, True))

if updates:
    df_updates = spark.createDataFrame(updates, df_existing.columns)
    df_final = df_existing.union(df_updates)
else:
    df_final = df_existing

print("Updated SCD2 Table:")
df_final.orderBy("ID","DATE").show()