#- Target Data 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date

spark = SparkSession.builder.appName("SCD2 Example").getOrCreate()

# Existing target table (historical data)
target_data = [
    (1, "Alice", "New York", "alice@email.com", "Y", "2024-01-01", 'None'),
    (2, "Bob", "Chicago", "bob@email.com", "Y", "2024-01-01", 'None'),
    (3, "Charlie", "Houston", "charlie@email.com", "Y", "2024-01-01", 'None')
]

target_columns = ["customer_id", "name", "city", "email", "is_current", "start_date", "end_date"]

target_df = spark.createDataFrame(target_data, target_columns)
target_df.show()
target_df.count()

+-----------+-------+--------+-----------------+----------+----------+--------+
|customer_id|   name|    city|            email|is_current|start_date|end_date|
+-----------+-------+--------+-----------------+----------+----------+--------+
|          1|  Alice|New York|  alice@email.com|         Y|2024-01-01|    None|
|          2|    Bob| Chicago|    bob@email.com|         Y|2024-01-01|    None|
|          3|Charlie| Houston|charlie@email.com|         Y|2024-01-01|    None|
+-----------+-------+--------+-----------------+----------+----------+--------+



3

# source --Incoming data

In [0]:
source_data = [
    (1, "Alice", "Boston", "alice@email.com"),     # City changed
    (2, "Bob", "Chicago", "bob_new@email.com"),    # Email changed
    (3, "Charlie", "Houston", "charlie@email.com"),# No change
    (4, "David", "Miami", "david@email.com")       # New record
]

source_columns = ["customer_id", "name", "city", "email"]

source_df = spark.createDataFrame(source_data, source_columns)
source_df.show()

+-----------+-------+-------+-----------------+
|customer_id|   name|   city|            email|
+-----------+-------+-------+-----------------+
|          1|  Alice| Boston|  alice@email.com|
|          2|    Bob|Chicago|bob_new@email.com|
|          3|Charlie|Houston|charlie@email.com|
|          4|  David|  Miami|  david@email.com|
+-----------+-------+-------+-----------------+



# SAVE THE TARGET DATA INTO DELTA TABLE

In [0]:
target_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/raw_schema/raw_volume/target_table')
target_df.show()

+-----------+-------+--------+-----------------+----------+----------+--------+
|customer_id|   name|    city|            email|is_current|start_date|end_date|
+-----------+-------+--------+-----------------+----------+----------+--------+
|          1|  Alice|New York|  alice@email.com|         Y|2024-01-01|    None|
|          2|    Bob| Chicago|    bob@email.com|         Y|2024-01-01|    None|
|          3|Charlie| Houston|charlie@email.com|         Y|2024-01-01|    None|
+-----------+-------+--------+-----------------+----------+----------+--------+



# SCD_2 LOGIC

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import *

delta_target = DeltaTable.forPath(spark, '/Volumes/workspace/raw_schema/raw_volume/target_table')


delta_target.alias('target').merge(source_df.alias('source'), 'target.customer_id=source.customer_id and target.is_current="Y"').whenMatchedUpdate(
    condition='target.name<>source.name or target.city<>source.city or target.email<> source.email',
    set={
        'end_date': current_date(),
        'is_current': lit('N')
    
    }
).whenNotMatchedInsert(
    values={
        'customer_id': 'source.customer_id',
        'name': 'source.name',
        'city': 'source.city',
        'email': 'source.email',
        'is_current': lit('Y'),
        'start_date': current_date(),
        'end_date': lit('None')
    }
).execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

# CHECK THE RESULT....

In [0]:
target_df = spark.read.format('delta').load('/Volumes/workspace/raw_schema/raw_volume/target_table')
target_df.show()

+-----------+-------+--------+-----------------+----------+----------+----------+
|customer_id|   name|    city|            email|is_current|start_date|  end_date|
+-----------+-------+--------+-----------------+----------+----------+----------+
|          3|Charlie| Houston|charlie@email.com|         Y|2024-01-01|      None|
|          1|  Alice|New York|  alice@email.com|         N|2024-01-01|2025-10-27|
|          2|    Bob| Chicago|    bob@email.com|         N|2024-01-01|2025-10-27|
|          4|  David|   Miami|  david@email.com|         Y|2025-10-27|      None|
|          2|    Bob| Chicago|bob_new@email.com|         Y|2025-10-27|      None|
|          1|  Alice|  Boston|  alice@email.com|         Y|2025-10-27|      None|
+-----------+-------+--------+-----------------+----------+----------+----------+

