In [30]:
import pyspark, os, sys
from pyspark.sql import *
from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import types
from pyspark import StorageLevel
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import hash

spark=SparkSession.builder.getOrCreate()

df_new=spark.read.format("csv").option("header", True).load("new.csv")

df_old=spark.read.format("csv").option("header", True).load("old.csv")
df_old=df_old.withColumn("st_date", col("st_date").cast("date"))\
            .withColumn("end_date", col("end_date").cast("date"))

df_new=df_new.withColumn("id", col("id").cast("int"))\
            .withColumn("dim1", col("dim1").cast("int"))\
            .withColumn("dim2", col("dim2").cast("int"))\
            .withColumn("dim3", col("dim3").cast("int"))\
            .withColumn("dim4", col("dim4").cast("int"))\
            .withColumn("new_hash", hash("id", "dim1", "dim2", "dim3", "dim4"))  #to compare with old records

df_old=df_old.withColumn("id_old", col("id_old").cast("int"))\
            .withColumn("dim1_old", col("dim1_old").cast("int"))\
            .withColumn("dim2_old", col("dim2_old").cast("int"))\
            .withColumn("dim3_old", col("dim3_old").cast("int"))\
            .withColumn("dim4_old", col("dim4_old").cast("int"))\
            .withColumn("old_hash", hash("id_old", "dim1_old", "dim2_old", "dim3_old", "dim4_old"))  #to compare with new records


join_df=df_new.join(df_old, df_new.id==df_old.id_old, "full")
join_df.show()

unchanged_df=join_df.filter((col("new_hash")==col("old_hash"))|(col("new_hash").isNull()))\
                    .select("id_old", "dim1_old", "dim2_old", "dim3_old", "dim4_old")  #row with no changes
unchanged_df.show()

new_changes_df=join_df.filter(col("old_hash").isNull()).select("id", "dim1", "dim2", "dim3", "dim4")  #new row
new_changes_df.show()

updated_df=join_df.filter(col("new_hash")!=col("old_hash")).select("id", "dim1", "dim2", "dim3", "dim4")  #row with updated changes
updated_df.show()

print("final SCD result:")
final_df=unchanged_df.union(new_changes_df).union(updated_df)
final_df.show()


+----+----+----+----+----+----------+------+--------+--------+--------+--------+----------+----------+-----------+
|  id|dim1|dim2|dim3|dim4|  new_hash|id_old|dim1_old|dim2_old|dim3_old|dim4_old|   st_date|  end_date|   old_hash|
+----+----+----+----+----+----------+------+--------+--------+--------+--------+----------+----------+-----------+
| 111| 200| 500| 800| 400|-842040841|   111|     200|     500|     800|     400|2024-12-01|2999-12-31| -842040841|
| 222| 800|1300| 800| 500| 307381462|   222|     900|    NULL|     700|     100|2024-12-01|2999-12-31|-1540894174|
|NULL|NULL|NULL|NULL|NULL|      NULL|   333|     300|     900|     250|     650|2024-12-01|2999-12-31|-1529515503|
| 444| 100|NULL| 700| 300|-805886361|  NULL|    NULL|    NULL|    NULL|    NULL|      NULL|      NULL|       NULL|
+----+----+----+----+----+----------+------+--------+--------+--------+--------+----------+----------+-----------+

+------+--------+--------+--------+--------+
|id_old|dim1_old|dim2_old|dim3_old