Lets see how to impliment SCD type 2 in Spark. This is a basic tutorial to learn the basics. 

In [0]:
from pyspark.sql.functions import 
from pyspark.sql.window import Window

In [0]:
source = "dbfs:/FileStore/source/"
DWH = "dbfs:/FileStore/DWH/"
end_date = "8888-01-01"
DATE_FORMAT = "yyyy-MM-dd"
primary_key = "cutomer_id"
SCD_cols = ["email", "address","city","state","zipcode"]
new_cols = ["eff_date", "end-date", "row_status"]

let's first import the initial load or the first dataset. then we will do some changes to the dimensions and import the updated data and do the changes in the dimensions.

In [0]:
%fs
ls dbfs:/FileStore/source/

path,name,size,modificationTime
dbfs:/FileStore/source/customers_initial.csv,customers_initial.csv,741,1712473728000
dbfs:/FileStore/source/new_customers.csv,new_customers.csv,741,1712473738000


In [0]:
source_schema = "customer_id long,firstname string, lastname string, email string, address string, city string, state string, zipcode long"

In [0]:
source_df = spark.read.schema(source_schema)\
    .option("header", "True")\
    .csv("dbfs:/FileStore/source/customers_initial.csv")
source_df.show()

+-----------+---------+--------+--------------------+-------------+--------+-----+-------+
|customer_id|firstname|lastname|               email|      address|    city|state|zipcode|
+-----------+---------+--------+--------------------+-------------+--------+-----+-------+
|          1|   Rakesh|  Sharma|RakeshSharma@gmai...|  123 Main St|   Hubly|   WB| 123450|
|          2|     John|   verma| Johnverma@gmail.com|  456 Oak Ave|   Delhi|Delhi| 678900|
|          3|    Sneha|   Yadav|SnehaYadav@gmail.com|   123 Elm Ln|   Delhi|Delhi| 876540|
|          4|    Mohit|   Mehra|MohitMehra@gmail.com| 234 Cedar Dr|   Delhi|Delhi| 890120|
|          5|    Mohit|   Yadav|MohitYadav@gmail.com| 567 Elm Blvd|  Mumbai|   MH| 456780|
|          6|   Rajesh|   Tilak|RajeshTilak@gmail...| 890 Birch Rd|  Mumbai|   MH| 234560|
|          7|    Vikas|   Kohli|VikasKohli@gmail.com|678 Maple Ave|New york|   NY| 789010|
|          8|   Vishal|  Khatri|VishalKhatri@gmai...|  901 Pine St|  Queens|   NY| 567890|

below we create the schema for our data warehouse, which has 4 additional columns, surrogate key, eff_date, end_date and row_status(A--> active row, I --> Inactive Row.)

In [0]:
DWH_schema = "customer_id long,firstname string, lastname string, email string, address string, city string, state string, zipcode long, cust_surrogate_key long, eff_date date, end_date date, row_status string"

Now lets create the new columns in our source data.
All the date is fresh as this is inital load, we will keep row_status as A(Active) for all the rows for now, until we get updated(incremental data).

In [0]:
enhanced_source_df = source_df \
        .withColumn("cust_surrogate_key",row_number().over(Window.orderBy(col("customer_id")))) \
        .withColumn("eff_date",date_format(current_date(), DATE_FORMAT)) \
        .withColumn("end_date",date_format(lit(end_date), DATE_FORMAT)) \
        .withColumn("row_status", lit("A"))
display(enhanced_source_df)

customer_id,firstname,lastname,email,address,city,state,zipcode,cust_surrogate_key,eff_date,end_date,row_status
1,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450,1,2024-04-07,8888-01-01,A
2,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900,2,2024-04-07,8888-01-01,A
3,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540,3,2024-04-07,8888-01-01,A
4,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120,4,2024-04-07,8888-01-01,A
5,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780,5,2024-04-07,8888-01-01,A
6,Rajesh,Tilak,RajeshTilak@gmail.com,890 Birch Rd,Mumbai,MH,234560,6,2024-04-07,8888-01-01,A
7,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010,7,2024-04-07,8888-01-01,A
8,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890,8,2024-04-07,8888-01-01,A
9,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,RJ,12345,9,2024-04-07,8888-01-01,A
10,Yash,Bhati,YashBhati@gmail.com,567 Oak St,Trichy,TN,987650,10,2024-04-07,8888-01-01,A


In [0]:
# Lets make sure we have the latest surrogate key so that we can add to it when we do the incremental load, we would need to keep track of latest surrogate key each time we add new data to data warehouse.
max_sk = enhanced_source_df.agg(max(col("cust_surrogate_key"))).collect()[0][0]
max_sk

Out[39]: 10

Out[32]: 10

Now lets write this data to our warehouse. Again this is just a basic implimentation, this writing could be done in a gold layer of the data lake using delta format for optimization and other benifits. to keep it simple for now i am writing to DWH folder in csv format.


In [0]:
enhanced_source_df.write.mode('overwrite') \
        .option("header",True) \
        .option("delimiter",",") \
        .csv(DWH)

So now our initial load is successful. Now we import the incremental data and add those changes to the dwh. Once inital load is completed, we do not need to run the above code again, but whenever we get new data, we would just need to run the below code.

let's first import the new data. Following changes have been done in the old data to get the new data

1. 3 new inputs 
2. 3 Updates \
i. customer_id 6 email and address change \
ii. Customer_id 9 state change \
iii. customer_id 1 city change 
3. 1  Deletes \
i. Customer_id 10 deleted

In [0]:
new_source_df = spark.read.schema(source_schema)\
    .option("header", "True")\
    .csv("dbfs:/FileStore/source/new_customers-1.csv")\
    .withColumn("address", trim(col("address")))
new_source_df.show()

+-----------+---------+--------+--------------------+---------------+--------+-----+-------+
|customer_id|firstname|lastname|               email|        address|    city|state|zipcode|
+-----------+---------+--------+--------------------+---------------+--------+-----+-------+
|          1|   Rakesh|  Sharma|RakeshSharma@gmai...|    123 Main St| Kolkata|   WB| 123450|
|          2|     John|   verma| Johnverma@gmail.com|    456 Oak Ave|   Delhi|Delhi| 678900|
|          3|    Sneha|   Yadav|SnehaYadav@gmail.com|     123 Elm Ln|   Delhi|Delhi| 876540|
|          4|    Mohit|   Mehra|MohitMehra@gmail.com|   234 Cedar Dr|   Delhi|Delhi| 890120|
|          5|    Mohit|   Yadav|MohitYadav@gmail.com|   567 Elm Blvd|  Mumbai|   MH| 456780|
|          6|   Rajesh|   Tilak|RajeshTilak@yahoo...|Mira road,Delhi|  Mumbai|   MH| 234560|
|          7|    Vikas|   Kohli|VikasKohli@gmail.com|  678 Maple Ave|New york|   NY| 789010|
|          8|   Vishal|  Khatri|VishalKhatri@gmai...|    901 Pine St| 

lets import the DWH and do the changes. Note that only active rows need changes so we will only import the active rows.

In [0]:
DWH_active_df = spark.read.schema(DWH_schema)\
    .option("header", "True")\
    .csv(DWH) \
    .where(col("row_status") == 'A') \
    .withColumn("address", trim(col("address")))
DWH_df.show()

+-----------+---------+--------+--------------------+-------------+--------+-----+-------+------------------+----------+----------+----------+
|customer_id|firstname|lastname|               email|      address|    city|state|zipcode|cust_surrogate_key|  eff_date|  end_date|row_status|
+-----------+---------+--------+--------------------+-------------+--------+-----+-------+------------------+----------+----------+----------+
|          1|   Rakesh|  Sharma|RakeshSharma@gmai...|  123 Main St|   Hubly|   WB| 123450|                 1|2024-04-07|8888-01-01|         A|
|          2|     John|   verma| Johnverma@gmail.com|  456 Oak Ave|   Delhi|Delhi| 678900|                 2|2024-04-07|8888-01-01|         A|
|          3|    Sneha|   Yadav|SnehaYadav@gmail.com|   123 Elm Ln|   Delhi|Delhi| 876540|                 3|2024-04-07|8888-01-01|         A|
|          4|    Mohit|   Mehra|MohitMehra@gmail.com| 234 Cedar Dr|   Delhi|Delhi| 890120|                 4|2024-04-07|8888-01-01|         A|

Now the task is to do all the new changes in the data warehouse. Adding new rows, reflecting the updates, and deleting the deleted rows. For that we can do a Full outer join, and based the results we can understand which row needs update, and which rows needs to be deleted/added.

In [0]:
merged_df = DWH_active_df.join(broadcast(new_source_df), "customer_id", "full_outer")
display(merged_df)

customer_id,firstname,lastname,email,address,city,state,zipcode,cust_surrogate_key,eff_date,end_date,row_status,firstname.1,lastname.1,email.1,address.1,city.1,state.1,zipcode.1
1,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450.0,1.0,2024-04-07,8888-01-01,A,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Kolkata,WB,123450.0
2,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0,2.0,2024-04-07,8888-01-01,A,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0
3,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0,3.0,2024-04-07,8888-01-01,A,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0
4,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0,4.0,2024-04-07,8888-01-01,A,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0
5,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0,5.0,2024-04-07,8888-01-01,A,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0
6,Rajesh,Tilak,RajeshTilak@gmail.com,890 Birch Rd,Mumbai,MH,234560.0,6.0,2024-04-07,8888-01-01,A,Rajesh,Tilak,RajeshTilak@yahoo.com,"Mira road,Delhi",Mumbai,MH,234560.0
7,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0,7.0,2024-04-07,8888-01-01,A,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0
8,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0,8.0,2024-04-07,8888-01-01,A,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0
9,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,RJ,12345.0,9.0,2024-04-07,8888-01-01,A,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,TN,12345.0
10,Yash,Bhati,YashBhati@gmail.com,567 Oak St,Trichy,TN,987650.0,10.0,2024-04-07,8888-01-01,A,,,,,,,


Now if we see merged data, if the left side values(DWH) are null, this means this is a new row, and needs to be added to the DWH.
But if it not null in DWH but null in the new data that means that row has been deleted, so we would need to delete that row frm the DWH. (As this is SCD type 2, we will not delete it, we will change the end_date and make the row_status = 'I')
In case of updates, we would need to check of the columns for each of the row. To optimize that we will create a hash of the values of those changing dimensions, if the hash is changes then we will change the row, change the row_status and add new row with new data.



Also if you see the merged data, it is difficult to check which column is from which dataframe, so lets create a function to rename the columns with DF name suffix. Also created a function for hashing the values. I am using md5 function to hash the values.

In [0]:
def column_renamer(df, suffix, append):
    if append:
        new_column_names = list(map(lambda x: x+suffix, df.columns))
    else:
        new_column_names = list(map(lambda x: x.replace(suffix,""), df.columns))
    return df.toDF(*new_column_names)

def get_hash(df, keys_list):
    columns = [col(column) for column in keys_list]
    if columns:
        return df.withColumn("hash_md5", md5(concat_ws("", *columns)))
    else:
        return df.withColumn("hash_md5", md5(lit(1)))

In [0]:
DWH_active_df_hash = column_renamer(get_hash(DWH_active_df, SCD_cols), suffix="_DWH", append=True)
new_source_df_hash = column_renamer(get_hash(new_source_df, SCD_cols), suffix="_source", append=True)


lets merge the hashed dataframes.

In [0]:
merged_df_hash = DWH_active_df_hash.\
    join(broadcast(new_source_df_hash), \
    col("customer_id_DWH") == col("customer_id_source"), "full_outer")
display(merged_df_hash)

customer_id_DWH,firstname_DWH,lastname_DWH,email_DWH,address_DWH,city_DWH,state_DWH,zipcode_DWH,cust_surrogate_key_DWH,eff_date_DWH,end_date_DWH,row_status_DWH,hash_md5_DWH,customer_id_source,firstname_source,lastname_source,email_source,address_source,city_source,state_source,zipcode_source,hash_md5_source
1.0,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450.0,1.0,2024-04-07,8888-01-01,A,4980302c046734a079406a1288d2eb81,1.0,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Kolkata,WB,123450.0,8883097f9658b1634048b8abb8150b45
2.0,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0,2.0,2024-04-07,8888-01-01,A,8579bac7e87e66ff956d1a5d045dcb92,2.0,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0,8579bac7e87e66ff956d1a5d045dcb92
3.0,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0,3.0,2024-04-07,8888-01-01,A,c361796d67b7aef6daadbbaecb3cc185,3.0,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0,c361796d67b7aef6daadbbaecb3cc185
4.0,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0,4.0,2024-04-07,8888-01-01,A,76ddef8e2fb6e7087562bcaf5e376c5b,4.0,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0,76ddef8e2fb6e7087562bcaf5e376c5b
5.0,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0,5.0,2024-04-07,8888-01-01,A,e2199a2a5bcd9d7dbf8578791c66111d,5.0,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0,e2199a2a5bcd9d7dbf8578791c66111d
6.0,Rajesh,Tilak,RajeshTilak@gmail.com,890 Birch Rd,Mumbai,MH,234560.0,6.0,2024-04-07,8888-01-01,A,e7034386d74e58c3827e513d1349034d,6.0,Rajesh,Tilak,RajeshTilak@yahoo.com,"Mira road,Delhi",Mumbai,MH,234560.0,e8ed82675ff505c13f815f8a07b4c997
7.0,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0,7.0,2024-04-07,8888-01-01,A,e685a1a248e75bf2c35505d7955be227,7.0,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0,e685a1a248e75bf2c35505d7955be227
8.0,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0,8.0,2024-04-07,8888-01-01,A,6ab95f174ea9137803a18007ab3652a2,8.0,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0,6ab95f174ea9137803a18007ab3652a2
9.0,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,RJ,12345.0,9.0,2024-04-07,8888-01-01,A,110cef0483bbdc653662bd31217a7861,9.0,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,TN,12345.0,7668848d5116daf75c0dfd718cc40ef8
10.0,Yash,Bhati,YashBhati@gmail.com,567 Oak St,Trichy,TN,987650.0,10.0,2024-04-07,8888-01-01,A,813fce128b66ceb80657c30e0f61c576,,,,,,,,,


Again, Now if we see merged data, if the left side values(DWH) are null, this means this is a new row, and needs to be added to the DWH. But if it not null in DWH but null in the new data that means that row has been deleted, so we would need to delete that row frm the DWH. (As this is SCD type 2, we will not delete it, we will change the end_date and make the row_status = 'I') In case of updates, we would need to check of the columns for each of the row. To optimize that we will create a hash of the values of those changing dimensions, if the hash is changes then we will change the row, change the row_status and add new row with new data.


In [0]:
merged_df_hash = merged_df_hash.withColumn("Action", when(col("hash_md5_source") == col("hash_md5_DWH")  , 'NOCHANGE')\
            .when(col("customer_id_source").isNull(), 'DELETE')\
            .when(col("customer_id_DWH").isNull(), 'INSERT')\
            .otherwise('UPDATE'))

display(merged_df_hash)

customer_id_DWH,firstname_DWH,lastname_DWH,email_DWH,address_DWH,city_DWH,state_DWH,zipcode_DWH,cust_surrogate_key_DWH,eff_date_DWH,end_date_DWH,row_status_DWH,hash_md5_DWH,customer_id_source,firstname_source,lastname_source,email_source,address_source,city_source,state_source,zipcode_source,hash_md5_source,Action
1.0,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450.0,1.0,2024-04-07,8888-01-01,A,4980302c046734a079406a1288d2eb81,1.0,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Kolkata,WB,123450.0,8883097f9658b1634048b8abb8150b45,UPDATE
2.0,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0,2.0,2024-04-07,8888-01-01,A,8579bac7e87e66ff956d1a5d045dcb92,2.0,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900.0,8579bac7e87e66ff956d1a5d045dcb92,NOCHANGE
3.0,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0,3.0,2024-04-07,8888-01-01,A,c361796d67b7aef6daadbbaecb3cc185,3.0,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540.0,c361796d67b7aef6daadbbaecb3cc185,NOCHANGE
4.0,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0,4.0,2024-04-07,8888-01-01,A,76ddef8e2fb6e7087562bcaf5e376c5b,4.0,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120.0,76ddef8e2fb6e7087562bcaf5e376c5b,NOCHANGE
5.0,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0,5.0,2024-04-07,8888-01-01,A,e2199a2a5bcd9d7dbf8578791c66111d,5.0,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780.0,e2199a2a5bcd9d7dbf8578791c66111d,NOCHANGE
6.0,Rajesh,Tilak,RajeshTilak@gmail.com,890 Birch Rd,Mumbai,MH,234560.0,6.0,2024-04-07,8888-01-01,A,e7034386d74e58c3827e513d1349034d,6.0,Rajesh,Tilak,RajeshTilak@yahoo.com,"Mira road,Delhi",Mumbai,MH,234560.0,e8ed82675ff505c13f815f8a07b4c997,UPDATE
7.0,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0,7.0,2024-04-07,8888-01-01,A,e685a1a248e75bf2c35505d7955be227,7.0,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010.0,e685a1a248e75bf2c35505d7955be227,NOCHANGE
8.0,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0,8.0,2024-04-07,8888-01-01,A,6ab95f174ea9137803a18007ab3652a2,8.0,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890.0,6ab95f174ea9137803a18007ab3652a2,NOCHANGE
9.0,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,RJ,12345.0,9.0,2024-04-07,8888-01-01,A,110cef0483bbdc653662bd31217a7861,9.0,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,TN,12345.0,7668848d5116daf75c0dfd718cc40ef8,UPDATE
10.0,Yash,Bhati,YashBhati@gmail.com,567 Oak St,Trichy,TN,987650.0,10.0,2024-04-07,8888-01-01,A,813fce128b66ceb80657c30e0f61c576,,,,,,,,,,DELETE


So as stated above while importing the new data, we has done below changes, and do reflect in the action column above.
1. 3 new inputs 
2. 3 Updates \
i. customer_id 6 email and address change \
ii. Customer_id 9 state change \
iii. customer_id 1 city change 
3. 1  Deletes \
i. Customer_id 10 deleted

Now filter for the Action column where we are required to do the changes.


In [0]:
unchanged_records = column_renamer(merged_df_hash.filter(col("action") == 'NOCHANGE'), suffix="_DWH", append=False).\
    select(DWH_active_df.columns)
unchanged_records.show()

+-----------+---------+--------+--------------------+-------------+--------+-----+-------+------------------+----------+----------+----------+
|customer_id|firstname|lastname|               email|      address|    city|state|zipcode|cust_surrogate_key|  eff_date|  end_date|row_status|
+-----------+---------+--------+--------------------+-------------+--------+-----+-------+------------------+----------+----------+----------+
|          2|     John|   verma| Johnverma@gmail.com|  456 Oak Ave|   Delhi|Delhi| 678900|                 2|2024-04-07|8888-01-01|         A|
|          3|    Sneha|   Yadav|SnehaYadav@gmail.com|   123 Elm Ln|   Delhi|Delhi| 876540|                 3|2024-04-07|8888-01-01|         A|
|          4|    Mohit|   Mehra|MohitMehra@gmail.com| 234 Cedar Dr|   Delhi|Delhi| 890120|                 4|2024-04-07|8888-01-01|         A|
|          5|    Mohit|   Yadav|MohitYadav@gmail.com| 567 Elm Blvd|  Mumbai|   MH| 456780|                 5|2024-04-07|8888-01-01|         A|

lets get the new records, that are to be inserted and add the eff_date, end_date_ row_status columns to these and then we will append these rows to our DWH, and make srue we have updated the max_sk.

In [0]:
insert_records = column_renamer(merged_df_hash.filter(col("action") == 'INSERT'), suffix="_source", append=False) \
                .select(new_source_df.columns)\
                .withColumn("row_number",row_number().over(Window.orderBy(col("customer_id"))))\
                .withColumn("cust_surrogate_key",col("row_number")+ max_sk)\
                .withColumn("eff_date",date_format(current_date(),DATE_FORMAT))\
                .withColumn("end_date",date_format(lit(end_date),DATE_FORMAT))\
                .withColumn("row_status", lit("A"))\
                .drop("row_number")

insert_records.show()

+-----------+---------+--------+----------------+-----------+------+-----+-------+------------------+----------+----------+----------+
|customer_id|firstname|lastname|           email|    address|  city|state|zipcode|cust_surrogate_key|  eff_date|  end_date|row_status|
+-----------+---------+--------+----------------+-----------+------+-----+-------+------------------+----------+----------+----------+
|         11|   Mikkka|   Bhati|mbhati@yahoo.com|901 Pine St|Queens|   NY| 567890|                14|2024-04-07|8888-01-01|         A|
|         12|    Bhaji|   Tilak|   BYT.gmail.com| 234 Oak Ln|Jaipur|   RJ|  12345|                15|2024-04-07|8888-01-01|         A|
|         13|    Tikka|   Singh|Tsingh@gmail.com| 567 Oak St|Trichy|   TN| 987650|                16|2024-04-07|8888-01-01|         A|
+-----------+---------+--------+----------------+-----------+------+-----+-------+------------------+----------+----------+----------+



In [0]:
max_sk = insert_records.agg({"cust_surrogate_key": "max"}).collect()[0][0]
max_sk

Out[80]: 16

Now lets update the rows. Make sure you understand that we are updating old rows in DWH to 'I' and changein the end date to current date then adding the new rows in the DWH.

In [0]:
update_records = column_renamer(merged_df_hash.filter(col("action") == 'UPDATE'), suffix="_DWH", append=False)\
                .select(DWH_active_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("row_status", lit("I"))\
            .unionByName(
            column_renamer(merged_df_hash.filter(col("action") == 'UPDATE'), suffix="_source", append=False)\
                .select(new_source_df.columns)\
                .withColumn("eff_date",date_format(current_date(),DATE_FORMAT))\
                .withColumn("end_date",date_format(lit(end_date),DATE_FORMAT))\
                .withColumn("row_number",row_number().over(Window.orderBy(col("customer_id"))))\
                .withColumn("cust_surrogate_key",col("row_number")+ max_sk)\
                .withColumn("row_status", lit("A"))\
                .drop("row_number")
                )

So basically update becomes two step proces, update end_date and row_status in old records and append the new rows. Like we see in the below dataframe. And as we are adding new rows, we need to get the latest max_sk

In [0]:
display(update_records)

customer_id,firstname,lastname,email,address,city,state,zipcode,cust_surrogate_key,eff_date,end_date,row_status
1,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450,1,2024-04-07,2024-04-07,I
6,Rajesh,Tilak,RajeshTilak@gmail.com,890 Birch Rd,Mumbai,MH,234560,6,2024-04-07,2024-04-07,I
9,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,RJ,12345,9,2024-04-07,2024-04-07,I
1,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Kolkata,WB,123450,17,2024-04-07,8888-01-01,A
6,Rajesh,Tilak,RajeshTilak@yahoo.com,"Mira road,Delhi",Mumbai,MH,234560,18,2024-04-07,8888-01-01,A
9,Rahul,H,RahulH@gmail.com,234 Oak Ln,Jaipur,TN,12345,19,2024-04-07,8888-01-01,A


In [0]:
max_sk = update_records.agg(max(col("cust_surrogate_key"))).collect()[0][0]
max_sk

Out[86]: 19

Now lets handle the deleted records, which are there in DWH but not in the new source data. So we mark row_status to I(Inactive)
and change the end_date to current date.

In [0]:
delete_records = column_renamer(merged_df_hash.filter(col("action") == 'DELETE'), suffix="_DWH", append=False)\
                .select(DWH_active_df.columns)\
                .withColumn("end_date", date_format(current_date(),DATE_FORMAT))\
                .withColumn("row_status", lit("I"))

delete_records.show()

+-----------+---------+--------+-------------------+----------+------+-----+-------+------------------+----------+----------+----------+
|customer_id|firstname|lastname|              email|   address|  city|state|zipcode|cust_surrogate_key|  eff_date|  end_date|row_status|
+-----------+---------+--------+-------------------+----------+------+-----+-------+------------------+----------+----------+----------+
|         10|     Yash|   Bhati|YashBhati@gmail.com|567 Oak St|Trichy|   TN| 987650|                10|2024-04-07|2024-04-07|         I|
+-----------+---------+--------+-------------------+----------+------+-----+-------+------------------+----------+----------+----------+



so the final data warehouse in the all the old inactive values from DWH, the unchanges records, new inserted records, updated records and the deleted records.

In [0]:
final_DWH = spark.read.schema(DWH_schema)\
            .option("header", "True")\
            .csv(DWH) \
            .where(col("row_status") == 'I')\
            .unionByName(unchanged_records)\
            .unionByName(insert_records)\
            .unionByName(update_records)\
            .unionByName(delete_records)

display(final_DWH)

customer_id,firstname,lastname,email,address,city,state,zipcode,cust_surrogate_key,eff_date,end_date,row_status
2,John,verma,Johnverma@gmail.com,456 Oak Ave,Delhi,Delhi,678900,2,2024-04-07,8888-01-01,A
3,Sneha,Yadav,SnehaYadav@gmail.com,123 Elm Ln,Delhi,Delhi,876540,3,2024-04-07,8888-01-01,A
4,Mohit,Mehra,MohitMehra@gmail.com,234 Cedar Dr,Delhi,Delhi,890120,4,2024-04-07,8888-01-01,A
5,Mohit,Yadav,MohitYadav@gmail.com,567 Elm Blvd,Mumbai,MH,456780,5,2024-04-07,8888-01-01,A
7,Vikas,Kohli,VikasKohli@gmail.com,678 Maple Ave,New york,NY,789010,7,2024-04-07,8888-01-01,A
8,Vishal,Khatri,VishalKhatri@gmail.com,901 Pine St,Queens,NY,567890,8,2024-04-07,8888-01-01,A
11,Mikkka,Bhati,mbhati@yahoo.com,901 Pine St,Queens,NY,567890,14,2024-04-07,8888-01-01,A
12,Bhaji,Tilak,BYT.gmail.com,234 Oak Ln,Jaipur,RJ,12345,15,2024-04-07,8888-01-01,A
13,Tikka,Singh,Tsingh@gmail.com,567 Oak St,Trichy,TN,987650,16,2024-04-07,8888-01-01,A
1,Rakesh,Sharma,RakeshSharma@gmail.com,123 Main St,Hubly,WB,123450,1,2024-04-07,2024-04-07,I


and finally we update the dataware house folder.

In [0]:
final_DWH.write.mode('overwrite') \
        .option("header",True) \
        .option("delimiter",",") \
        .csv(DWH)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3897401979865207>:1[0m
[0;32m----> 1[0m [43mfinal_DWH[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m'[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      2[0m [43m        [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;28;43;01mTrue[39;49;00m[43m)[49m[43m [49m[43m\[49m
[1;32m      3[0m [43m        [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelimiter[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43m,[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      4[0m [43m       