Step 1: Assume this is a daily CDC

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Create Spark session
spark = SparkSession.builder.appName("SCD Type 2 Pipeline").getOrCreate()

# Step 1: Define your source data
source_data = [
    (101, "Rama", 80000, "IT", "Hyd", "2025-07-19"),
    (102, "Pavan", 55000, "Fin", "Blr", "2025-07-19"),
    (103, "Pavani", 48000, "HR", "Mumbai", "2025-07-19"),
    (104, "Mike", 65000, "Dev", "New York", "2025-07-19")
]

source_columns = ["emp_id", "name", "sal", "dept", "city", "valid_from"]

# Step 2: Define your target (historical) data
target_data = [
    (101, "Rama", 70000, "IT", "Hyd", "2025-07-01", "9999-12-31", "Y"),
    (101, "Rama", 60000, "IT", "Hyd", "2025-02-26", "2025-07-01", "N"),
    (102, "Pavan", 55000, "Fin", "Blr", "2025-07-01", "9999-12-31", "Y"),
    (103, "Pavani", 48000, "HR", "Pune", "2025-07-01", "9999-12-31", "Y")
]

target_columns = ["emp_id", "name", "sal", "dept", "city", "valid_from", "valid_to", "is_active"]

# Step 3: Create DataFrames
source_df = spark.createDataFrame(source_data, source_columns)
target_df = spark.createDataFrame(target_data, target_columns)

# Cast date columns properly
from pyspark.sql.functions import col, to_date

source_df = source_df.withColumn("valid_from", to_date(col("valid_from"), "yyyy-MM-dd"))
target_df = target_df.withColumn("valid_from", to_date(col("valid_from"), "yyyy-MM-dd")) \
                     .withColumn("valid_to", to_date(col("valid_to"), "yyyy-MM-dd"))

# Display initial data
print("Source Data:")
source_df.show()

print("Target Dimension Table:")
target_df.show()


Source Data:
+------+------+-----+----+--------+----------+
|emp_id|  name|  sal|dept|    city|valid_from|
+------+------+-----+----+--------+----------+
|   101|  Rama|80000|  IT|     Hyd|2025-07-19|
|   102| Pavan|55000| Fin|     Blr|2025-07-19|
|   103|Pavani|48000|  HR|  Mumbai|2025-07-19|
|   104|  Mike|65000| Dev|New York|2025-07-19|
+------+------+-----+----+--------+----------+

Target Dimension Table:
+------+------+-----+----+----+----------+----------+---------+
|emp_id|  name|  sal|dept|city|valid_from|  valid_to|is_active|
+------+------+-----+----+----+----------+----------+---------+
|   101|  Rama|70000|  IT| Hyd|2025-07-01|9999-12-31|        Y|
|   101|  Rama|60000|  IT| Hyd|2025-02-26|2025-07-01|        N|
|   102| Pavan|55000| Fin| Blr|2025-07-01|9999-12-31|        Y|
|   103|Pavani|48000|  HR|Pune|2025-07-01|9999-12-31|        Y|
+------+------+-----+----+----+----------+----------+---------+



Step 2: apply xxhash64 and compare records between source and target

In [0]:
from pyspark.sql.functions import xxhash64, col

# Add hash column to target and source
df_target = target_df.withColumn("hash64", xxhash64(
    col("emp_id"), col("name"), col("sal"), col("dept"), col("city")
))

df_source = source_df.withColumn("hash64", xxhash64(
    col("emp_id"), col("name"), col("sal"), col("dept"), col("city")
))

print("************************ Target Table Data ************************")
df_target.display(truncate=False)

print("************************ Source DF Data ************************")
df_source.display(truncate=False)


************************ Target Table Data ************************


emp_id,name,sal,dept,city,valid_from,valid_to,is_active,hash64
101,Rama,70000,IT,Hyd,2025-07-01,9999-12-31,Y,-7669449300241596311
101,Rama,60000,IT,Hyd,2025-02-26,2025-07-01,N,6784066679327434019
102,Pavan,55000,Fin,Blr,2025-07-01,9999-12-31,Y,-5263860670282463898
103,Pavani,48000,HR,Pune,2025-07-01,9999-12-31,Y,7801829772592953525


************************ Source DF Data ************************


emp_id,name,sal,dept,city,valid_from,hash64
101,Rama,80000,IT,Hyd,2025-07-19,8589350289873110920
102,Pavan,55000,Fin,Blr,2025-07-19,-5263860670282463898
103,Pavani,48000,HR,Mumbai,2025-07-19,-4972189013777351868
104,Mike,65000,Dev,New York,2025-07-19,-331763272281981946


Step 3: Performing a "Left Anti" join on the Source and Target data using the hash key, which filters out matching records from the left table, leaving only the unmatched data.

In [0]:
# Only active records from target are relevant
df_target_active = df_target.filter(col("is_active") == "Y")

# Left Anti Join to get updated or new records
df_result = df_source.join(
    df_target_active,
    df_target_active.hash64 == df_source.hash64,
    "leftanti"
).drop("hash64")

# Display the result
df_result.display()


emp_id,name,sal,dept,city,valid_from
101,Rama,80000,IT,Hyd,2025-07-19
103,Pavani,48000,HR,Mumbai,2025-07-19
104,Mike,65000,Dev,New York,2025-07-19


Step 4: add columns ensuring its alignment with target columns 

In [0]:
from pyspark.sql.functions import lit, current_timestamp, to_date

df_result = df_result.withColumn("is_active", lit("Y")) \
    .withColumn("valid_from", to_date(current_timestamp())) \
    .withColumn("valid_to", to_date(lit("9999-12-31")))

df_result.display()

emp_id,name,sal,dept,city,valid_from,is_active,valid_to
101,Rama,80000,IT,Hyd,2025-07-19,Y,9999-12-31
103,Pavani,48000,HR,Mumbai,2025-07-19,Y,9999-12-31
104,Mike,65000,Dev,New York,2025-07-19,Y,9999-12-31


Step 5: Append these records to target

In [0]:
# Step-5: Union old + new records (SCD Type 2)
df_combined = target_df.unionByName(df_result)

print("Final SCD Type 2 Output:")
df_combined.display()


Final SCD Type 2 Output:


emp_id,name,sal,dept,city,valid_from,valid_to,is_active
101,Rama,80000,IT,Hyd,2025-07-19,9999-12-31,Y
103,Pavani,48000,HR,Mumbai,2025-07-19,9999-12-31,Y
104,Mike,65000,Dev,New York,2025-07-19,9999-12-31,Y
101,Rama,70000,IT,Hyd,2025-07-01,9999-12-31,Y
101,Rama,60000,IT,Hyd,2025-02-26,2025-07-01,N
102,Pavan,55000,Fin,Blr,2025-07-01,9999-12-31,Y
103,Pavani,48000,HR,Pune,2025-07-01,9999-12-31,Y


Step 6: Apply the ROW_NUMBER() function to the dataset, partitioned by emp_id and ordered by valid_from in descending order.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Define window spec: partition by emp_id and order by valid_from descending
window_spec = Window.partitionBy("emp_id").orderBy(df_combined["valid_from"].desc())

# Apply row_number to identify latest rows per employee
df_with_rownum = df_combined.withColumn("row_number", row_number().over(window_spec))

# Display the result
df_with_rownum.display()

emp_id,name,sal,dept,city,valid_from,valid_to,is_active,row_number
101,Rama,80000,IT,Hyd,2025-07-19,9999-12-31,Y,1
101,Rama,70000,IT,Hyd,2025-07-01,9999-12-31,Y,2
101,Rama,60000,IT,Hyd,2025-02-26,2025-07-01,N,3
102,Pavan,55000,Fin,Blr,2025-07-01,9999-12-31,Y,1
103,Pavani,48000,HR,Mumbai,2025-07-19,9999-12-31,Y,1
103,Pavani,48000,HR,Pune,2025-07-01,9999-12-31,Y,2
104,Mike,65000,Dev,New York,2025-07-19,9999-12-31,Y,1


Step 7: Update is_active and valid_to

In [0]:
from pyspark.sql.functions import lag, when

# Reuse window spec from row_number step
window_spec = Window.partitionBy("emp_id").orderBy(df_combined["valid_from"].desc())

# Apply lag to get next valid_from date
df_final = df_with_rownum.withColumn("next_valid_from", lag("valid_from").over(window_spec))

# Update is_active and valid_to based on row_number
df_final = df_final.withColumn("is_active", when(df_final["row_number"] == 1, "Y").otherwise("N")) \
                   .withColumn("valid_to", when(df_final["row_number"] == 1, df_final["valid_to"])
                                         .otherwise(df_final["next_valid_from"]))

# Drop helper column if not needed
df_final = df_final.drop("row_number", "next_valid_from")

# Show final SCD2 table
display(df_final)

emp_id,name,sal,dept,city,valid_from,valid_to,is_active
101,Rama,80000,IT,Hyd,2025-07-19,9999-12-31,Y
101,Rama,70000,IT,Hyd,2025-07-01,2025-07-19,N
101,Rama,60000,IT,Hyd,2025-02-26,2025-07-01,N
102,Pavan,55000,Fin,Blr,2025-07-01,9999-12-31,Y
103,Pavani,48000,HR,Mumbai,2025-07-19,9999-12-31,Y
103,Pavani,48000,HR,Pune,2025-07-01,2025-07-19,N
104,Mike,65000,Dev,New York,2025-07-19,9999-12-31,Y


Step-8:
Write the latest dataframe results to the target table (overwrite the previous Delta table).