In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, expr, cast
from delta.tables import DeltaTable
import datetime

In [14]:
from ntbk_logger import get_notebook_logger

logger = get_notebook_logger(logfile="logs/scd.log")

In [15]:
spark = SparkSession.builder \
    .appName("scd") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [16]:
new_data = [
    {"id": 9,  "name": "Ivy", "age": 30, "department": "Marketing", "salary": 66000},
    {"id": 10, "name": "Jack", "age": 38, "department": "IT", "salary": 50000},
    {"id": 11, "name": "Karen", "age": 27, "department": "HR", "salary": 51000},
    {"id": 12, "name": "Leo", "age": 41, "department": "Engineering", "salary": 99000},
    {"id": 13, "name": "Maya", "age": 34, "department": "Sales", "salary": 61000},
    {"id": 14, "name": "Nina", "age": 29, "department": "Sales", "salary": 54000},
    {"id": 15, "name": "Oscar", "age": 32, "department": "HR", "salary": 54000},
    {"id": 16, "name": "Paul", "age": 36, "department": "IT", "salary": 87000},
    {"id": 17, "name": "Queenie", "age": 25, "department": "Marketing", "salary": 23445},
    {"id": 18, "name": "Raj", "age": 39, "department": "Sales", "salary": 53000},
    {"id": 19, "name": "Sara", "age": 28, "department": "HR", "salary": 50000},
    {"id": 20, "name": "Tom", "age": 45, "department": "Engineering", "salary": 223344}
]


new_data = spark.createDataFrame(new_data)
# FAR_FUTURE_DATE = "9999-12-31"

# incoming_scd2_df = new_data.withColumn("start_time", current_timestamp()) \
#                               .withColumn("end_time", lit(FAR_FUTURE_DATE).cast("timestamp")) \
#                               .withColumn("cur_status", lit("active"))

# incoming_scd2_df.show()

In [17]:
try:
    delta_table_path = "data/employee"

    # Set new records to be active
    FAR_FUTURE_DATE = "9999-12-31"
    incoming_scd2_df = new_data.withColumn("start_time", current_timestamp()) \
                                  .withColumn("end_time", lit(FAR_FUTURE_DATE).cast("timestamp")) \
                                  .withColumn("cur_status", lit("active"))

    if DeltaTable.isDeltaTable(spark, delta_table_path):
        delta_table = DeltaTable.forPath(spark, delta_table_path)
        existing_df = delta_table.toDF().filter("cur_status = 'active'")

        # Check for changed records for salary/department or new ids
        compare_df = incoming_scd2_df.alias("incoming").join(
            existing_df.alias("existing"), on="id", how="left_outer"
        ).filter(
            (col("existing.id").isNull()) | 
            (col("incoming.salary") != col("existing.salary")) |
            (col("incoming.department") != col("existing.department"))
        ).select("incoming.*")  # Only changed or new records


        # If any changes, run SCD2 merge
        if compare_df.count():
            logger.debug("------ Changes found. Proceeding with SCD2 merge.")
            changed_ids = [row["id"] for row in compare_df.select("id").distinct().collect()]
            logger.info(f"Changes detected for IDs: {changed_ids}")

            # Expire old records  (set to 'inactive')
            # Here, target = existing records and source = new/changed records
            delta_table.alias("target").merge(
                compare_df.alias("source"),
                "target.id = source.id AND target.cur_status = 'active'"
            ).whenMatchedUpdate(condition="""
                    target.department != source.department OR 
                    target.salary != source.salary
                """, set={
                    "end_time": "current_timestamp()",
                    "cur_status": "'inactive'"
                }
            ).whenNotMatchedInsertAll().execute()
            logger.debug("------ Expired old records and inserted new records")
        else:
            logger.debug("------ No changes detected. Skipping merge.")
    else:
        # First-time write if no existing table found 
        incoming_scd2_df.write.format("delta").mode("overwrite").save(delta_table_path)
        logger.debug("---- Existing table not found. Created and written new table.")

except Exception as e:
    print(f" Exception in SCD2 merge: {e}")

2025-10-06 17:14:07,950 - notebook_logger - DEBUG - ------ Changes found. Proceeding with SCD2 merge.
2025-10-06 17:14:14,915 - notebook_logger - INFO - Changes detected for IDs: [9, 10, 11, 12, 13, 14, 15, 17, 16, 18, 19, 20]
2025-10-06 17:14:26,581 - notebook_logger - DEBUG - ------ Expired old records and inserted new records


In [18]:
df = spark.read.format("delta").load("data/employee")

df.show()

+---+-----------+---+-------+------+--------------------+--------------------+----------+
|age| department| id|   name|salary|          start_time|            end_time|cur_status|
+---+-----------+---+-------+------+--------------------+--------------------+----------+
| 30|  Marketing|  9|    Ivy| 58000|2025-10-06 17:13:...|2025-10-06 17:14:...|  inactive|
| 38|      Sales| 10|   Jack| 49000|2025-10-06 17:13:...|2025-10-06 17:14:...|  inactive|
| 27|         HR| 11|  Karen| 51000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 41|Engineering| 12|    Leo| 99000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 34|      Sales| 13|   Maya| 61000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 29|      Sales| 14|   Nina| 54000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 32|         HR| 15|  Oscar| 54000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 36|         IT| 16|   Paul| 87000|2025-10-06 17:14:...| 9999-12-31 00:00:00|    active|
| 25|  Mar