In [0]:
#SCDType2 impl logic
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, to_date, hash

# Create Spark session
spark = SparkSession.builder.appName("SCD2").getOrCreate()

# Initial Full Load (Historical Data)
data_full = [
    ["1001", "gaurav", "hyderabad", "42000"],
    ["1002", "vijay", "hyderabad", "45565"],
    ["1003", "akanksha", "hyderabad", "52000"],
    ["1004", "niharika", "hyderabad", "35000"]
]
columns = ['id', 'name', 'location', 'salry']
df_full = spark.createDataFrame(data_full, columns)

# Incremental / Daily Load
data_daily = [
    ["1003", "akanksha", "delhi", "65000"],   # changed
    ["1004", "niharika", "bihar", None],      # changed
    ["1005", "murali", "vijaywada", "80000"], # new record
    ["1002", "vijay", "hyderabad", "45565"]   # unchanged
]
df_daily = spark.createDataFrame(data_daily, columns)

# Add SCD2 columns
df_full = (
    df_full.withColumn("Active_Flag", lit("Y"))
    .withColumn("From_Date", to_date(current_date()))
    .withColumn("To_Date", lit(None).cast("date"))
)

df_daily = (
    df_daily.withColumn("Active_Flag", lit("Y"))
    .withColumn("From_Date", to_date(current_date()))
    .withColumn("To_Date", lit(None).cast("date"))
)

print("==== Full Load Data ====")
df_full.show()

print("==== Daily Load Data ====")
df_daily.show()

# Step 1: Find updated records (changed data)
update_ds = (
    df_full.join(
        df_daily,
        (df_full["id"] == df_daily["id"]) & (df_full["Active_Flag"] == "Y"),
        "inner"
    )
    .filter(
        hash(df_full["name"], df_full["location"], df_full["salry"])
        != hash(df_daily["name"], df_daily["location"], df_daily["salry"]) # comparing source and target fields
    )
    .select(
        df_full["id"],
        df_full["name"],
        df_full["location"],
        df_full["salry"],
        lit("N").alias("Active_Flag"),  # expire old record
        df_full["From_Date"],
        to_date(current_date()).alias("To_Date")
    )
)

print("==== Updated Records (Expired Old) ====")
update_ds.show()

# Step 2: Unchanged records
no_change = df_full.join(update_ds, "id", "left_anti")
print("==== Unchanged Records ====")
no_change.show()

# Step 3: New inserts (new rows from daily or changed versions)
insert_ds = df_daily.join(no_change, "id", "left_anti")
print("==== Insert Records (New/Changed Versions) ====")
insert_ds.show()

# Step 4: Final SCD2 Table
df_final = update_ds.union(insert_ds).union(no_change)

print("==== Final SCD2 Table ====")
df_final.show(truncate=False)
