***Slowing Changing Dimension***
-- Change of attributes/values of entities over a period of time.

SCD - Type I   : Keeping only latest information,
SCD - Type II  : Kepping all the history, creating new column to track the history and active status.
SCD - Type III : Keeping latest informationa and one lastest history information, creating new column to flag as old

In [0]:
%sql
--Create Delta table
CREATE OR REPLACE TABLE scd2Demo(
pk1 int,
pk2 string,
dim1 INT,
dim2 INT,
dim3 INT,
dim4 INT,
active_status STRING,
start_date Timestamp,
end_date Timestamp
) USING DELTA
LOCATION '/FileStore/tables/scd2Demo'

In [0]:
%sql
insert into scd2Demo Values (111, 'Unit1', 200, 500, 800, 400, 'Y', current_timestamp(), '9999-12-31');
insert into scd2Demo Values (222, 'Unit2', 900, Null, 700, 100, 'Y', current_timestamp(), '9999-12-31');
insert into scd2Demo Values (333, 'Unit3', 300, 900, 250, 650, 'Y', current_timestamp(), '9999-12-31');

num_affected_rows,num_inserted_rows
1,1


In [0]:
from delta import *
targetTable = DeltaTable.forPath(spark, "/FileStore/tables/scd2Demo")
targetDF = targetTable.toDF()
display(targetDF)

pk1,pk2,dim1,dim2,dim3,dim4,active_status,start_date,end_date
111,Unit1,200,500.0,800,400,Y,2022-09-15T15:35:30.014+0000,9999-12-31T00:00:00.000+0000
333,Unit3,300,900.0,250,650,Y,2022-09-15T15:35:37.140+0000,9999-12-31T00:00:00.000+0000
222,Unit2,900,,700,100,Y,2022-09-15T15:35:33.756+0000,9999-12-31T00:00:00.000+0000


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([StructField('pk1', StringType(), True), 
                     StructField('pk2', StringType(), True), 
                     StructField('dim1', IntegerType(), True),
                     StructField('dim2', IntegerType(), True),
                     StructField('dim3', IntegerType(), True),
                     StructField('dim4', IntegerType(), True)])

In [0]:
data = [(111, 'Unit1', 200, 500, 800, 400),
      (222, 'Unit2', 800, 1300, 800, 500),
      (444, 'Unit4', 100, None, 700, 300)]
sourceDF = spark.createDataFrame(data = data, schema = schema)
display(sourceDF)

pk1,pk2,dim1,dim2,dim3,dim4
111,Unit1,200,500.0,800,400
222,Unit2,800,1300.0,800,500
444,Unit4,100,,700,300


In [0]:
## leftouter Join sourceDF and TargetDF to get all in one table with Null values for Insert records.
joinDF = sourceDF.join(targetDF, (sourceDF.pk1==targetDF.pk1) & (sourceDF.pk2==targetDF.pk2) & \
                      (targetDF.active_status=="Y"), "leftouter")\
                .select(sourceDF["*"], \
                       targetDF.pk1.alias("target_pk1"), \
                       targetDF.pk2.alias("target_pk2"), \
                       targetDF.dim1.alias("target_dim1"), \
                       targetDF.dim2.alias("target_dim2"), \
                       targetDF.dim3.alias("target_dim3"), \
                       targetDF.dim4.alias("target_dim4"))
display(joinDF)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4
111,Unit1,200,500.0,800,400,111.0,Unit1,200.0,500.0,800.0,400.0
222,Unit2,800,1300.0,800,500,222.0,Unit2,900.0,,700.0,100.0
444,Unit4,100,,700,300,,,,,,


In [0]:
# checking the joinDF attributes if there is any changes in target values, these will be retrieved.
# concatination is done using xxhash64 because it contains null values.

filterDF = joinDF.filter(xxhash64(joinDF.dim1,joinDF.dim2,joinDF.dim3,joinDF.dim4) != xxhash64(joinDF.target_dim1,joinDF.target_dim2,joinDF.target_dim3,joinDF.target_dim4))
display(filterDF)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4
222,Unit2,800,1300.0,800,500,222.0,Unit2,900.0,,700.0,100.0
444,Unit4,100,,700,300,,,,,,


In [0]:
# Merge key creation on filterDF
mergeDF = filterDF.withColumn("MERGEKEY", concat(filterDF.pk1,filterDF.pk2))
display(mergeDF)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,MERGEKEY
222,Unit2,800,1300.0,800,500,222.0,Unit2,900.0,,700.0,100.0,222Unit2
444,Unit4,100,,700,300,,,,,,,444Unit4


In [0]:
#dummy df only for primary key is not null pk.
dummyDF = filterDF.filter("target_pk1 is not null").withColumn("MERGEKEY", lit(None))
display(dummyDF)

pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,MERGEKEY
222,Unit2,800,1300,800,500,222,Unit2,900,,700,100,


In [0]:
# union dummyDF and mergeDF to get full records for further processing.
scdDF = mergeDF.union(dummyDF)
display(scdDF)


pk1,pk2,dim1,dim2,dim3,dim4,target_pk1,target_pk2,target_dim1,target_dim2,target_dim3,target_dim4,MERGEKEY
222,Unit2,800,1300.0,800,500,222.0,Unit2,900.0,,700.0,100.0,222Unit2
444,Unit4,100,,700,300,,,,,,,444Unit4
222,Unit2,800,1300.0,800,500,222.0,Unit2,900.0,,700.0,100.0,


In [0]:
#Merge (update or insert) based on above scdDF's MERGEKEY 
# if MERGEKEY is null, then, it is inserted with active_status as 'Y' and other one will be 'N'
targetTable.alias('target').merge(
    source = scdDF.alias("source"),
    condition = "concat(target.pk1,target.pk2) = source.MERGEKEY and target.active_status = 'Y'"
    ).whenMatchedUpdate(set={ 
        "active_status": "'N'",
        "end_date": "current_date"
    }).whenNotMatchedInsert(values={
        "pk1": "source.pk1",
         "pk2": "source.pk2",
         "dim1": "source.dim1",
        "dim2": "source.dim2",
        "dim3": "source.dim3",
        "dim4": "source.dim4",
        "active_status":"'Y'",
        "start_date": "current_date",
        "end_date": """to_date('9999-12-31', 'yyyy-mm-dd')"""
}).execute()

In [0]:
%sql
-- Final Output for SCD Type II.
select * from scd2Demo

pk1,pk2,dim1,dim2,dim3,dim4,active_status,start_date,end_date
111,Unit1,200,500.0,800,400,Y,2022-09-15T15:35:30.014+0000,9999-12-31T00:00:00.000+0000
333,Unit3,300,900.0,250,650,Y,2022-09-15T15:35:37.140+0000,9999-12-31T00:00:00.000+0000
222,Unit2,800,1300.0,800,500,Y,2022-09-15T00:00:00.000+0000,9999-01-31T00:00:00.000+0000
444,Unit4,100,,700,300,Y,2022-09-15T00:00:00.000+0000,9999-01-31T00:00:00.000+0000
222,Unit2,900,,700,100,N,2022-09-15T15:35:33.756+0000,2022-09-15T00:00:00.000+0000
