**⭐ 1. What This Pattern Solves**

MERGE handles upserts and slowly changing dimensions in Delta tables.

SCD1 (Overwrite updates): Update existing records; no history.

SCD2 (Versioned updates): Keep historical versions by tracking start/end dates or flags.

Used for:

Incrementally updating Silver or Gold tables

Avoiding duplicates in streaming or batch pipelines

Maintaining audit/history for analytics

**⭐ 2. SQL Equivalent**

In [0]:
%sql
-- SCD1: overwrite existing rows
MERGE INTO silver AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- SCD2: track history
MERGE INTO silver AS target
USING updates AS source
ON target.id = source.id AND target.is_current = true
WHEN MATCHED THEN 
    UPDATE SET target.is_current = false, target.end_date = source.update_date
WHEN NOT MATCHED THEN 
    INSERT (id, col1, col2, start_date, end_date, is_current)
    VALUES (source.id, source.col1, source.col2, source.update_date, NULL, true);


**⭐ 3. Core Idea**

MERGE = UPSERT: combine INSERT + UPDATE + DELETE in a single operation

SCD1 = simple update

SCD2 = maintain history with versioning columns

Reusability: Any Delta table can be incrementally updated using MERGE for batch or streaming ingestion.

**⭐ 4. Template Code (MEMORIZE THIS)**

In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/delta/silver")

# SCD1: overwrite existing rows
delta_table.alias("target").merge(
    source_updates.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# SCD2: track history
from pyspark.sql.functions import current_date, lit

updates = source_updates.withColumn("start_date", current_date()) \
                        .withColumn("end_date", lit(None)) \
                        .withColumn("is_current", lit(True))

delta_table.alias("target").merge(
    updates.alias("source"),
    "target.id = source.id AND target.is_current = true"
).whenMatchedUpdate(
    set = {
        "is_current": lit(False),
        "end_date": updates.start_date
    }
).whenNotMatchedInsertAll() \
 .execute()


**⭐ 5. Detailed Example**

In [0]:
# Initial Silver table
silver_data = [("A", 100), ("B", 50)]
df_silver = spark.createDataFrame(silver_data, ["id", "amount"])
df_silver.write.format("delta").mode("overwrite").save("/delta/silver")

# Updates
updates_data = [("A", 200), ("C", 300)]
source_updates = spark.createDataFrame(updates_data, ["id", "amount"])

# MERGE (SCD1)
delta_table = DeltaTable.forPath(spark, "/delta/silver")
delta_table.alias("target").merge(
    source_updates.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

spark.read.format("delta").load("/delta/silver").show()

**Step-by-step:**

Target = existing Silver table

Source = incoming updates

MERGE updates matched rows, inserts new rows

For SCD2, additional columns track history

**⭐ 6. Mini Practice Problems**

Perform SCD1 MERGE for product prices; update existing, insert new.

Implement SCD2 MERGE for customer records with is_current and end_date.

Merge a streaming dataset of transactions into a Silver table using Delta MERGE.

**⭐ 7. Full Data Engineering Problem**

Scenario: A bank ingests daily customer account updates.

Silver table tracks current balances (SCD1).

Gold table keeps historical account snapshots for analytics (SCD2).

Implement MERGE with incremental batch updates while keeping Delta ACID guarantees.

**⭐ 8. Time & Space Complexity**

Time: O(n) scan + O(m) merge; scales linearly with source + target rows

Space: Extra space if SCD2, since historical rows are preserved

**⭐ 9. Common Pitfalls**

Forgetting is_current filter → duplicate historical versions

Using overwrite instead of MERGE → loses existing data

Not handling schema evolution → merge failure

Large tables without partitioning → MERGE is slow