In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_date, col, when, expr
from pyspark.sql.types import StructType, StructField, StringType, DateType

spark = SparkSession.builder.appName("SCD2 Example").getOrCreate()

# Existing dimension data (target)
dim_schema = StructType([
    StructField("emp_id", StringType(), True),
    StructField("emp_name", StringType(), True),
    StructField("designation", StringType(), True),
    StructField("start_date", StringType(), True),
    StructField("end_date", StringType(), True),
    StructField("is_current", StringType(), True)
])

dim_data = [
    ("101", "John", "Developer", "2021-01-01", None, "Y"),
    ("102", "Alice", "Analyst", "2021-01-01", None, "Y"),
]

dim_df = spark.createDataFrame(data=dim_data, schema=dim_schema)
dim_df = dim_df.withColumn("start_date", col("start_date").cast(DateType()))
dim_df.show()


+------+--------+-----------+----------+--------+----------+
|emp_id|emp_name|designation|start_date|end_date|is_current|
+------+--------+-----------+----------+--------+----------+
|   101|    John|  Developer|2021-01-01|    null|         Y|
|   102|   Alice|    Analyst|2021-01-01|    null|         Y|
+------+--------+-----------+----------+--------+----------+



In [10]:
# New incoming data (source)
src_data = [
    ("101", "John", "Lead Developer"),  # designation changed
    ("102", "Alice", "Analyst"),        # no change
]

src_schema = StructType([
    StructField("emp_id", StringType(), True),
    StructField("emp_name", StringType(), True),
    StructField("designation", StringType(), True)
])

src_df = spark.createDataFrame(data=src_data, schema=src_schema)
src_df.show()


+------+--------+--------------+
|emp_id|emp_name|   designation|
+------+--------+--------------+
|   101|    John|Lead Developer|
|   102|   Alice|       Analyst|
+------+--------+--------------+



In [11]:
# Join source with current dimension data
joined_df = src_df.alias("src").join(
    dim_df.filter(col("is_current") == "Y").alias("tgt"),
    on="emp_id",
    how="left"
)

joined_df.show()

# Find changed records (designation mismatch)
changed_df = joined_df.filter(
    col("src.designation") != col("tgt.designation")
).select(
    "src.emp_id", "src.emp_name", "src.designation",
    current_date().alias("start_date"),
    lit(None).cast(DateType()).alias("end_date"),
    lit("Y").alias("is_current")
)
changed_df.show()


+------+--------+--------------+--------+-----------+----------+--------+----------+
|emp_id|emp_name|   designation|emp_name|designation|start_date|end_date|is_current|
+------+--------+--------------+--------+-----------+----------+--------+----------+
|   101|    John|Lead Developer|    John|  Developer|2021-01-01|    null|         Y|
|   102|   Alice|       Analyst|   Alice|    Analyst|2021-01-01|    null|         Y|
+------+--------+--------------+--------+-----------+----------+--------+----------+

+------+--------+--------------+----------+--------+----------+
|emp_id|emp_name|   designation|start_date|end_date|is_current|
+------+--------+--------------+----------+--------+----------+
|   101|    John|Lead Developer|2025-04-09|    null|         Y|
+------+--------+--------------+----------+--------+----------+



In [12]:
# Update existing records - set end_date and is_current='N'
expired_df = joined_df.filter(
    col("src.designation") != col("tgt.designation")
).select(
    "tgt.emp_id", "tgt.emp_name", "tgt.designation",
    "tgt.start_date",
    current_date().alias("end_date"),
    lit("N").alias("is_current")
)
expired_df.show()


+------+--------+-----------+----------+----------+----------+
|emp_id|emp_name|designation|start_date|  end_date|is_current|
+------+--------+-----------+----------+----------+----------+
|   101|    John|  Developer|2021-01-01|2025-04-09|         N|
+------+--------+-----------+----------+----------+----------+



In [13]:
# Combine unchanged records, updated old records, and new changed records
unchanged_df = joined_df.filter(
    col("src.designation") == col("tgt.designation")
).select("tgt.*")
unchanged_df.show()

final_df = unchanged_df.unionByName(expired_df).unionByName(changed_df)
final_df.show()


+------+--------+-----------+----------+--------+----------+
|emp_id|emp_name|designation|start_date|end_date|is_current|
+------+--------+-----------+----------+--------+----------+
|   102|   Alice|    Analyst|2021-01-01|    null|         Y|
+------+--------+-----------+----------+--------+----------+

+------+--------+--------------+----------+----------+----------+
|emp_id|emp_name|   designation|start_date|  end_date|is_current|
+------+--------+--------------+----------+----------+----------+
|   102|   Alice|       Analyst|2021-01-01|      null|         Y|
|   101|    John|     Developer|2021-01-01|2025-04-09|         N|
|   101|    John|Lead Developer|2025-04-09|      null|         Y|
+------+--------+--------------+----------+----------+----------+



In [8]:
final_df.orderBy("emp_id", "start_date").show(truncate=False)


+------+--------+--------------+----------+----------+----------+
|emp_id|emp_name|designation   |start_date|end_date  |is_current|
+------+--------+--------------+----------+----------+----------+
|101   |John    |Developer     |2021-01-01|2025-04-09|N         |
|101   |John    |Lead Developer|2025-04-09|null      |Y         |
|102   |Alice   |Analyst       |2021-01-01|null      |Y         |
+------+--------+--------------+----------+----------+----------+

