In this notebook we show basic implementation of SCD Type 2.

### Init Spark session

In [None]:
from delta.tables import DeltaTable

from src.helpers import init_spark
from src.schemas import incoming_updates_schema, target_table_schema

from pyspark.sql.functions import lit


spark = init_spark()

### Initial target table (historical)


In [17]:
initial_data = [
    (1, "Alice", "Kyiv", "2025-01-01", None,  True),
    (2, "Charlie", "Lviv", "2025-01-01", None, True),
]

target_df = spark.createDataFrame(initial_data, target_table_schema)

# Save as Delta table
target_path = "/tmp/delta-table"
target_df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").save(target_path)

target_table = DeltaTable.forPath(spark, target_path)
print("Initial target table")
target_table.toDF().show()

Initial target table
+---+-------+-------+----------+--------+----------+
| id|   name|address|start_date|end_date|is_current|
+---+-------+-------+----------+--------+----------+
|  2|Charlie|   Lviv|2025-01-01|    null|      true|
|  1|  Alice|   Kyiv|2025-01-01|    null|      true|
+---+-------+-------+----------+--------+----------+



### Incoming updates

In [18]:
incoming_data = [
    (1, "Alice", "Odesa", "2025-03-01"),  # Alice moved
    (3, "Advik", "Dnipro", "2025-03-01"),  # New friend
]
incoming_df = spark.createDataFrame(incoming_data, incoming_updates_schema)

print("Incoming updates:")
incoming_df.show()

Incoming updates:
+---+-----+-------+----------+
| id| name|address|start_date|
+---+-----+-------+----------+
|  1|Alice|  Odesa|2025-03-01|
|  3|Advik| Dnipro|2025-03-01|
+---+-----+-------+----------+



### SCD Type 2 basic implementation


In [19]:
incoming_rows = incoming_df.selectExpr("id as merge_key", "*")

incoming_target_overlap = (
    incoming_df.alias("incoming")
        .join(target_table.toDF().alias("target"), "id")
        .where("target.is_current = true AND incoming.address <> target.address")
        .selectExpr("NULL as merge_key", "incoming.*")
)

staged_df = incoming_rows.union(incoming_target_overlap)
staged_df.show()

+---------+---+-----+-------+----------+
|merge_key| id| name|address|start_date|
+---------+---+-----+-------+----------+
|        1|  1|Alice|  Odesa|2025-03-01|
|        3|  3|Advik| Dnipro|2025-03-01|
|     null|  1|Alice|  Odesa|2025-03-01|
+---------+---+-----+-------+----------+



In [20]:
(
    target_table.alias("target")
    .merge(
        staged_df.alias("staged"),
        "target.id = merge_key"
    )
    .whenMatchedUpdate(
        condition="target.is_current = true AND target.address <> staged.address",
        set={
            "is_current": "false",
            "end_date": "staged.start_date"
        }
    )
    .whenNotMatchedInsert(
        values={
            "id": "staged.id",
            "name": "staged.name",
            "address": "staged.address",
            "start_date": "staged.start_date",
            "end_date": lit(None),
            "is_current": lit(True)
        }
    )
    .execute()
)

target_table = DeltaTable.forPath(spark, target_path)

print("Target table result:")
target_table.toDF().orderBy(["id", "is_current"]).show()

Target table result:
+---+-------+-------+----------+----------+----------+
| id|   name|address|start_date|  end_date|is_current|
+---+-------+-------+----------+----------+----------+
|  1|  Alice|   Kyiv|2025-01-01|2025-03-01|     false|
|  1|  Alice|  Odesa|2025-03-01|      null|      true|
|  2|Charlie|   Lviv|2025-01-01|      null|      true|
|  3|  Advik| Dnipro|2025-03-01|      null|      true|
+---+-------+-------+----------+----------+----------+



# Caveats

In [23]:
# This solution doesn't work when two incoming records in the same batch try to change same row in the target table.
# For example, if Alice moves from Kyiv to Paris and then to Odesa inside one batch, the process will fail.

incoming_data = [
    (1, "Alice", "Paris", "2025-02-01"),  # Alice moved 1st time
    (1, "Alice", "Odesa", "2025-03-01")  # Alice moved 2nd time
]

incoming_df = spark.createDataFrame(incoming_data, incoming_updates_schema)

print("incoming updates:")
incoming_df.show()

incoming_rows = incoming_df.selectExpr("id as merge_key", "*")

incoming_target_overlap = (
    incoming_df.alias("incoming")
        .join(target_table.toDF().alias("target"), "id")
        .where("target.is_current = true AND incoming.address <> target.address")
        .selectExpr("NULL as merge_key", "incoming.*")
)

staged_df = incoming_rows.union(incoming_target_overlap)
print("staged dataframe:")
staged_df.show()

(
    target_table.alias("target")
    .merge(
        staged_df.alias("staged"),
        "target.id = merge_key"
    )
    .whenMatchedUpdate(
        condition="target.is_current = true AND target.address <> staged.address",
        set={
            "is_current": "false",
            "end_date": "staged.start_date"
        }
    )
    .whenNotMatchedInsert(
        values={
            "id": "staged.id",
            "name": "staged.name",
            "address": "staged.address",
            "start_date": "staged.start_date",
            "end_date": lit(None),
            "is_current": lit(True)
        }
    )
    .execute()
)

# Cannot perform Merge as multiple source rows matched and attempted to modify the same
# target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
# when multiple source rows match on the same target row, the result may be ambiguous
# as it is unclear which source row should be used to update or delete the matching
# target row. You can preprocess the source table to eliminate the possibility of
# multiple matches.

incoming updates:
+---+-----+-------+----------+
| id| name|address|start_date|
+---+-----+-------+----------+
|  1|Alice|  Paris|2025-02-01|
|  1|Alice|  Odesa|2025-03-01|
+---+-----+-------+----------+

staged dataframe:
+---------+---+-----+-------+----------+
|merge_key| id| name|address|start_date|
+---------+---+-----+-------+----------+
|        1|  1|Alice|  Paris|2025-02-01|
|        1|  1|Alice|  Odesa|2025-03-01|
|     null|  1|Alice|  Paris|2025-02-01|
+---------+---+-----+-------+----------+



25/09/16 13:35:47 ERROR MergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.delta.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
	at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException(DeltaErrors.scala:1102)
	at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException$(DeltaErrors.scala:1099)
	at org.apache.spark.sql.delta.DeltaErrors$.multipleSource

Py4JJavaError: An error occurred while calling o697.execute.
: org.apache.spark.sql.delta.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
	at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException(DeltaErrors.scala:1102)
	at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException$(DeltaErrors.scala:1099)
	at org.apache.spark.sql.delta.DeltaErrors$.multipleSourceRowMatchingTargetRowInMergeException(DeltaErrors.scala:2794)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$findTouchedFiles$1(MergeIntoCommand.scala:369)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:906)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.findTouchedFiles(MergeIntoCommand.scala:297)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:235)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:204)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:229)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:204)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:75)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:75)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:75)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:202)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:197)
	at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:103)
	at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:91)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:75)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:197)
	at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:290)
	at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:105)
	at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:91)
	at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:148)
	at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:266)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
