In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, current_date, expr, row_number
from pyspark.sql.window import Window

In [47]:
spark = SparkSession\
    .builder\
    .master("local")\
    .appName("SCD Type-2 Version Example")\
    .getOrCreate()

In [48]:
source_df = spark\
    .read\
    .format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("/Users/sahilnagpal/Desktop/wordsToSpeak/SlowlyChangingDimension/dataset/source-file-version.csv")

source_df.show()

+---+-------+----------+
| id|   name|department|
+---+-------+----------+
|  1|  Alice|        HR|
|  2|    Bob|   Finance|
|  3|Charlie|        IT|
|  4|  Diana|        HR|
|  5|    Eve|        IT|
+---+-------+----------+



In [49]:
existing_df = spark\
    .read\
    .format("csv")\
    .option("header",True)\
    .option("inferSchema",True)\
    .load("/Users/sahilnagpal/Desktop/wordsToSpeak/SlowlyChangingDimension/dataset/dimension-file-version.csv")

existing_df.show()

+---+-------+----------+---------+-------+
| id|   name|department|is_active|version|
+---+-------+----------+---------+-------+
|  1|  Alice|        HR|     true|    1.0|
|  2|    Bob|   Finance|     true|    1.0|
|  3|Charlie|        IT|     true|    1.0|
|  4|  Diana| Marketing|     true|    1.0|
+---+-------+----------+---------+-------+



In [50]:
joined_df = source_df\
    .join(existing_df,on= source_df.id == existing_df.id,how="left_outer")

joined_df.show()

+---+-------+----------+----+-------+----------+---------+-------+
| id|   name|department|  id|   name|department|is_active|version|
+---+-------+----------+----+-------+----------+---------+-------+
|  1|  Alice|        HR|   1|  Alice|        HR|     true|    1.0|
|  2|    Bob|   Finance|   2|    Bob|   Finance|     true|    1.0|
|  3|Charlie|        IT|   3|Charlie|        IT|     true|    1.0|
|  4|  Diana|        HR|   4|  Diana| Marketing|     true|    1.0|
|  5|    Eve|        IT|NULL|   NULL|      NULL|     NULL|   NULL|
+---+-------+----------+----+-------+----------+---------+-------+



In [51]:
new_records = joined_df\
    .filter(existing_df.id.isNull())\
    .withColumn("is_active",lit(True))\
    .withColumn("version",lit(1))\
    .select(source_df.id,source_df.name,source_df.department,"is_active","version")

new_records.show()

+---+----+----------+---------+-------+
| id|name|department|is_active|version|
+---+----+----------+---------+-------+
|  5| Eve|        IT|     true|      1|
+---+----+----------+---------+-------+



In [52]:
changed_records = joined_df\
    .filter(
    (existing_df.id.isNotNull()) & ((existing_df.name != source_df.name) | (existing_df.department != source_df.department))
)

changed_records.show()

+---+-----+----------+---+-----+----------+---------+-------+
| id| name|department| id| name|department|is_active|version|
+---+-----+----------+---+-----+----------+---------+-------+
|  4|Diana|        HR|  4|Diana| Marketing|     true|    1.0|
+---+-----+----------+---+-----+----------+---------+-------+



In [53]:
updated_records = changed_records\
    .select(source_df.id,source_df.name,source_df.department,existing_df.is_active,(existing_df.version+1).alias("version"))
updated_records.show()

+---+-----+----------+---------+-------+
| id| name|department|is_active|version|
+---+-----+----------+---------+-------+
|  4|Diana|        HR|     true|    2.0|
+---+-----+----------+---------+-------+



In [54]:
expired_record = changed_records\
    .select(
        existing_df.id,
        existing_df.name,
        existing_df.department,
        lit(False).alias("is_active"),  # Expire the old record
        existing_df.version
    )

expired_record.show()

+---+-----+----------+---------+-------+
| id| name|department|is_active|version|
+---+-----+----------+---------+-------+
|  4|Diana| Marketing|    false|    1.0|
+---+-----+----------+---------+-------+



In [55]:
scd2_updates = updated_records.union(new_records)

scd2_updates.show()

+---+-----+----------+---------+-------+
| id| name|department|is_active|version|
+---+-----+----------+---------+-------+
|  4|Diana|        HR|     true|    2.0|
|  5|  Eve|        IT|     true|    1.0|
+---+-----+----------+---------+-------+



In [56]:
final_df = existing_df\
    .union(scd2_updates)\
    .union(expired_record)

final_df.show()

+---+-------+----------+---------+-------+
| id|   name|department|is_active|version|
+---+-------+----------+---------+-------+
|  1|  Alice|        HR|     true|    1.0|
|  2|    Bob|   Finance|     true|    1.0|
|  3|Charlie|        IT|     true|    1.0|
|  4|  Diana| Marketing|     true|    1.0|
|  4|  Diana|        HR|     true|    2.0|
|  5|    Eve|        IT|     true|    1.0|
|  4|  Diana| Marketing|    false|    1.0|
+---+-------+----------+---------+-------+



In [64]:
df_with_row_num = final_df\
    .withColumn("row_num",row_number().over(Window.partitionBy(col("id"))\
                                            .orderBy(col("is_active").desc(),col("version").desc())))

final_df = df_with_row_num\
    .filter(
    ((col("is_active")==True) & (col("row_num")==1)) | (col("is_active") == False))\
    .drop("row_num")

final_df.show()

+---+-------+----------+---------+-------+
| id|   name|department|is_active|version|
+---+-------+----------+---------+-------+
|  1|  Alice|        HR|     true|    1.0|
|  2|    Bob|   Finance|     true|    1.0|
|  3|Charlie|        IT|     true|    1.0|
|  4|  Diana|        HR|     true|    2.0|
|  4|  Diana| Marketing|    false|    1.0|
|  5|    Eve|        IT|     true|    1.0|
+---+-------+----------+---------+-------+

