In [0]:
%sql
create or replace table scd2demo(
  pk1 int,
  pk2 string,
  dim1 int,
  dim2 int,
  dim3 int,
  dim4 int,
  active_status string,
  start_date timestamp,
  end_date timestamp
)
using delta
location '/FileStore/tables/scd2demo'

In [0]:
%sql
insert into scd2demo values(111,'unit1',200,500,800,400,'y',current_timestamp(),'9999-12-31');
insert into scd2demo values(222,'unit2',900,Null,700,100,'y',current_timestamp(),'9999-12-31');
insert into scd2demo values(333,'unit3',300,900,250,650,'y',current_timestamp(),'9999-12-31');



num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from scd2demo

pk1,pk2,dim1,dim2,dim3,dim4,active_status,start_date,end_date
111,unit1,200,500.0,800,400,y,2025-02-17T05:16:44.209Z,9999-12-31T00:00:00Z
333,unit3,300,900.0,250,650,y,2025-02-17T05:16:59.069Z,9999-12-31T00:00:00Z
222,unit2,900,,700,100,N,2025-02-17T05:16:54.658Z,2025-02-17T00:00:00Z
222,unit2,800,1300.0,800,500,,,
444,unit4,100,,700,300,,,


In [0]:
from delta import *
from pyspark.sql.functions import *
targertable=DeltaTable.forPath(spark,'/FileStore/tables/scd2demo')
targetdf=targertable.toDF()
display(targetdf)

pk1,pk2,dim1,dim2,dim3,dim4,active_status,start_date,end_date
111,unit1,200,500.0,800,400,y,2025-02-17T05:16:44.209Z,9999-12-31T00:00:00Z
333,unit3,300,900.0,250,650,y,2025-02-17T05:16:59.069Z,9999-12-31T00:00:00Z
222,unit2,900,,700,100,y,2025-02-17T05:16:54.658Z,9999-12-31T00:00:00Z


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema_scd2=StructType([StructField('pk1',StringType(),True),\
    StructField('pk2',StringType(),True),\
        StructField('dim1',IntegerType(),True),\
            StructField('dim2',IntegerType(),True),\
                StructField('dim3',IntegerType(),True),\
                    StructField('dim4',IntegerType(),True)]
                       )

In [0]:
data=[(111,'unit1',200,500,800,400),
      (222,'unit2',800,1300,800,500),
      (444,'unit4',100,None,700,300)]
sourcedf=spark.createDataFrame(data=data,schema=schema_scd2)
display(sourcedf)

pk1,pk2,dim1,dim2,dim3,dim4
111,unit1,200,500.0,800,400
222,unit2,800,1300.0,800,500
444,unit4,100,,700,300


In [0]:
joindf=sourcedf.join(targetdf,(sourcedf.pk1==targetdf.pk1) &\
    (sourcedf.pk2==targetdf.pk2) & \
        (targetdf.active_status=='y'),'leftouter'
    ).select(sourcedf['*'],\
            targetdf.pk1.alias('target_pk1'),\
                targetdf.pk2.alias('target_pk2'),\
                    targetdf.dim1.alias('target_dim1'),\
                        targetdf.dim2.alias('target_dim2'),\
                            targetdf.dim3.alias('target_dim3'),\
                                targetdf.dim4.alias('target_dim4'))
display(joindf)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4
111,unit1,200,500.0,800,400,111.0,unit1,200.0,500.0,800.0,400.0
222,unit2,800,1300.0,800,500,222.0,unit2,900.0,,700.0,100.0
444,unit4,100,,700,300,,,,,,


In [0]:
filterdf=joindf.filter(xxhash64(joindf.dim1,joindf.dim2,joindf.dim3,joindf.dim4)!=xxhash64(joindf.target_dim1,joindf.target_dim2,joindf.target_dim3,joindf.target_dim4))
display(filterdf)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4
222,unit2,800,1300.0,800,500,222.0,unit2,900.0,,700.0,100.0
444,unit4,100,,700,300,,,,,,


In [0]:
mergedf=filterdf.withColumn('merge_key',concat(filterdf.pk1,filterdf.pk2))
display(mergedf)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,merge_key
222,unit2,800,1300.0,800,500,222.0,unit2,900.0,,700.0,100.0,222unit2
444,unit4,100,,700,300,,,,,,,444unit4


In [0]:
dummydf=filterdf.filter("target_pk1 is not null").withColumn('merge_key',lit(None))
display(dummydf)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,merge_key
222,unit2,800,1300,800,500,222,unit2,900,,700,100,


In [0]:
scddf=mergedf.union(dummydf)
display(scddf)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,merge_key
222,unit2,800,1300.0,800,500,222.0,unit2,900.0,,700.0,100.0,222unit2
444,unit4,100,,700,300,,,,,,,444unit4
222,unit2,800,1300.0,800,500,222.0,unit2,900.0,,700.0,100.0,


In [0]:
targertable.alias('target').merge(source=scddf.alias('source'),
                               condition="concat(target.pk1,target.pk2)=source.merge_key and target.active_status='y'").whenMatchedUpdate(set={
                                   "active_status":"'N'",
                                   "end_date":"current_date"
                               }).whenNotMatchedInsert(values={
                                   "pk1":"source.pk1",
                                   "pk2":"source.pk2",
                                   "dim1":"source.dim1",
                                   "dim2":"source.dim2",
                                   "dim3":"source.dim3",
                                   "dim4":"source.dim4"
                               }).execute()
