In [0]:
from pyspark.sql import functions as F
from datetime import datetime
from delta.tables import DeltaTable
from pyspark.sql.functions import col, lit, current_date

In [0]:
def func_scd2(target_delta ,source,key_cols):
  
    scd_cols = ["start_date","end_date","is_current","hash","key"]

    target = target_delta.toDF()

    # Identify the columns which are not key or scd columns 
    hash_columns = list(set(target.columns) - set(key_cols) - set(scd_cols))
    print(hash_columns)

    # Those columns will help us to identify the insert's and updates
    source = source.withColumn('key',F.concat_ws('',*key_cols))\
                   .withColumn("hash",F.sha2(F.concat_ws("||",*hash_columns),256))         

    # Process to identify the inserts (via key) and updates (via hash)
    inserts = source.alias('src').join(target.alias('trg').where('trg.is_current = True'), on='key', how='left')\
                                 .where('trg.key is null')

    updates = source.alias('src').join(target.alias('trg').where('trg.is_current = True'), on = 'key', how='inner')\
                                .where('src.hash <> trg.hash')


    # Based on this column "mergekey" we will perfrom the MERGE operation
    inserts = inserts.selectExpr("NULL as mergekey","src.*")
    updates = updates.selectExpr("key as mergekey","src.*")


    # Union the inserts and updates
    # All of those records should be ingested into target table
    final_df_before_ingestion = inserts.unionByName(updates)


    # MERGE: target - final_df_before_ingestion
    # SOS: Our goal is to identify only the changed records which are not current and update the end_date
    target_delta.alias('trg').merge(
        final_df_before_ingestion.alias('src'),
        "trg.key = src.mergekey"
    )\
    .whenMatchedUpdate(
        condition = "trg.is_current = True AND trg.hash <> src.hash",
        set = {
          "is_current": lit(False),
          "end_date": lit(datetime.now().strftime("%Y-%m-%d"))
        }
    ).execute()

    # Prepare the records to be ingested
    final_df_before_ingestion = final_df_before_ingestion \
        .withColumn("is_current", lit('True')) \
        .withColumn("start_date", lit(datetime.now().strftime("%Y-%m-%d"))) \
        .withColumn("end_date", lit(None))\
        .drop("mergekey")

    # Append new records to Delta table
    final_df_before_ingestion.write.format("delta").mode("append").saveAsTable("cappa_ds_dev.default.my_test_table_scd")
    