In [0]:
%sql
CREATE DATABASE IF NOT EXISTS training;

In [0]:
%sql
--DROP TABLE training.EMP_SCD2_PYSPARK;
CREATE OR REPLACE TABLE training.EMP_SCD2_PYSPARK
(
  EMPNO INT, 
	ENAME STRING, 
	JOB STRING, 
	MGR INT, 
	HIREDATE DATE, 
	SAL INT, 
	COMM INT, 
	DEPTNO INT,
	sk_EMPNO INT,
	EFFECTIVE_DATE DATE,
	EXPIRATION_DATE DATE,
	CURRENT_FLAG STRING,
	ETL_CHECKSUM STRING
)
USING DELTA
LOCATION '/FileStore/tables/delta-table-merge/EMP_SCD2_PYSPARK';

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from os import path, listdir
from delta.tables import DeltaTable


In [0]:
#Letâ€™s start with defining some variables. You can also have a config file for this. 
# It has a SOURCE_PATH, DEST_PATH, and key_list holds the natural key column in the table. 
# type2_cols holds a list of columns for which type 2 methodology will be implemented. 
# scd2_cols are columns used for SCD type 2 handling.
EOW_DATE = "9999-12-31"
#SOURCE_PATH = "dbfs:/FileStore/tables/EMP.csv"
#DEST_PATH = "dbfs:/FileStore/tables/EMP_SCD2_NOMERGE.csv"
KEY_LIST = ["EMPNO"]
type2_cols = ["JOB","MGR","HIREDATE",	"DEPTNO", "SAL", "COMM"]
SCD2_COLUMNS =["EFFECTIVE_DATE","EXPIRATION_DATE","CURRENT_FLAG"]
DATE_FORMAT = "yyyy-MM-dd"



In [0]:
EMPDATA = [[7839,'KING','PRESIDENT','','1981-11-17',5000,'',10],
[7698,'BLAKE','MANAGER',7839,'1981-05-01',2850,'',30],
[7782,'CLARK','MANAGER',7839,'1981-06-09',2450,'',10],
[7566,'JONES','MANAGER',7839,'1981-04-02',2975,'',20],
[7788,'SCOTT','ANALYST',7566,'1987-04-19',3000,'',20],
[7902,'FORD','ANALYST',7566,'1981-12-03',3000,'',20],
[7369,'SMITH','CLERK',7902,'1980-12-17',800,'',20],
[7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600,300,30],
[7521,'WARD','SALESMAN',7698,'1981-02-22',1250,500,30],
[7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250,1400,30],
[7844,'TURNER','SALESMAN',7698,'1981-09-08',1500,0,30],
[7876,'ADAMS','CLERK',7788,'1987-05-23',1100,'',20],
[7900,'JAMES','CLERK',7698,'1981-12-03',950,'',30],
[7934,'MILLER','ANALYST',7782,'1982-01-23',1300,'',40],
[7935,'MILLER_T','ANALYST',7782,'1982-01-23',1300,'',40]
]

df_source = spark.createDataFrame(EMPDATA, ['EMPNO','ENAME','JOB','MGR','HIREDATE','SAL','COMM','DEPTNO'])
df_source.printSchema()
display(df_source)

root
 |-- EMPNO: long (nullable = true)
 |-- ENAME: string (nullable = true)
 |-- JOB: string (nullable = true)
 |-- MGR: string (nullable = true)
 |-- HIREDATE: string (nullable = true)
 |-- SAL: long (nullable = true)
 |-- COMM: string (nullable = true)
 |-- DEPTNO: long (nullable = true)



EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7839,KING,PRESIDENT,,1981-11-17,5000,,10
7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30
7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10
7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20
7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20
7902,FORD,ANALYST,7566.0,1981-12-03,3000,,20
7369,SMITH,CLERK,7902.0,1980-12-17,800,,20
7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30
7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30
7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30


In [0]:
type2_cols = ["JOB","MGR","HIREDATE",	"DEPTNO", "SAL", "COMM"]
# Convert list of column names into a list of column objects using 'col'
column_objects = [col(col_name) for col_name in type2_cols]
print(column_objects)
df_source = df_source.withColumn("ETL_CHECKSUM", md5(concat_ws('|',*column_objects)))
display(df_source)

[Column<'JOB'>, Column<'MGR'>, Column<'HIREDATE'>, Column<'DEPTNO'>, Column<'SAL'>, Column<'COMM'>]


EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,ETL_CHECKSUM
7839,KING,PRESIDENT,,1981-11-17,5000,,10,bc1cf418724ab56313dc6c3f2e7eb34c
7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30,3e9e79400e00fd2374eb64cbf34da1c0
7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10,34d454b81056c09de467ed30077da5c9
7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20,f696067cd364d153e154204d62598445
7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20,10d0a7d4ac6c6558cc7d7210761ab590
7902,FORD,ANALYST,7566.0,1981-12-03,3000,,20,3d31c3715fb51805c4b71b1f5cf456d5
7369,SMITH,CLERK,7902.0,1980-12-17,800,,20,f5fc6c3de19d491f867d94856fb98f87
7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30,40fa7d7eb2d3994624740f143adb097e
7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30,ba596d8b173e58e4b05fab66694ec819
7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30,be5f02c7512f42be780879e2946687e9


In [0]:
#Create a Temporary view for DataSource
df_source.createOrReplaceTempView('EMP_SOURCE_PYSPARK')

# Create a DataFrame for the Target DataSource
from delta.tables import *
EMPSCD2MergeTable = DeltaTable.forPath(spark, "/FileStore/tables/delta-table-merge/EMP_SCD2_PYSPARK")
EMPSCD2MergeTable = EMPSCD2MergeTable.toDF()

# Create the source DataFrame with JOIN_KEY column and all source columns
stg_emp_vw1 = spark.table("EMP_SOURCE_PYSPARK")
stg_emp_vw1 = stg_emp_vw1.selectExpr("EMPNO AS JOIN_KEY", "*")
display(stg_emp_vw1)

# Join Target Data Frame with Source Dataframe for the new records
stg_emp_vw2 = spark.table("EMP_SOURCE_PYSPARK")
#display(stg_emp_vw2)
stg_emp_vw2 = stg_emp_vw2.selectExpr("NULL AS JOIN_KEY","*")\
.join(EMPSCD2MergeTable, (stg_emp_vw2["EMPNO"] == EMPSCD2MergeTable["EMPNO"]), "inner")\
.filter((EMPSCD2MergeTable["ETL_CHECKSUM"] != stg_emp_vw2["ETL_CHECKSUM"]) & (EMPSCD2MergeTable["CURRENT_FLAG"] == "Y"))\
.select("JOIN_KEY",stg_emp_vw2["EMPNO"], stg_emp_vw2["ENAME"], stg_emp_vw2["JOB"], stg_emp_vw2["MGR"],stg_emp_vw2["HIREDATE"], stg_emp_vw2["SAL"], stg_emp_vw2["COMM"], stg_emp_vw2["DEPTNO"],stg_emp_vw2["ETL_CHECKSUM"])
#.select("JOIN_KEY", *)

display(stg_emp_vw2)

#Union the both Dataframes
stg_emp_vw = stg_emp_vw1.unionAll(stg_emp_vw2)
display(stg_emp_vw)



JOIN_KEY,EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,ETL_CHECKSUM
7839,7839,KING,PRESIDENT,,1981-11-17,5000,,10,bc1cf418724ab56313dc6c3f2e7eb34c
7698,7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30,3e9e79400e00fd2374eb64cbf34da1c0
7782,7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10,34d454b81056c09de467ed30077da5c9
7566,7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20,f696067cd364d153e154204d62598445
7788,7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20,10d0a7d4ac6c6558cc7d7210761ab590
7902,7902,FORD,ANALYST,7566.0,1981-12-03,3000,,20,3d31c3715fb51805c4b71b1f5cf456d5
7369,7369,SMITH,CLERK,7902.0,1980-12-17,800,,20,f5fc6c3de19d491f867d94856fb98f87
7499,7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30,40fa7d7eb2d3994624740f143adb097e
7521,7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30,ba596d8b173e58e4b05fab66694ec819
7654,7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30,be5f02c7512f42be780879e2946687e9


JOIN_KEY,EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,ETL_CHECKSUM
,7934,MILLER,ANALYST,7782,1982-01-23,1300,,40,4178f1da3b8586de4de7d5205d54eaf2


JOIN_KEY,EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,ETL_CHECKSUM
7839.0,7839,KING,PRESIDENT,,1981-11-17,5000,,10,bc1cf418724ab56313dc6c3f2e7eb34c
7698.0,7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30,3e9e79400e00fd2374eb64cbf34da1c0
7782.0,7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10,34d454b81056c09de467ed30077da5c9
7566.0,7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20,f696067cd364d153e154204d62598445
7788.0,7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20,10d0a7d4ac6c6558cc7d7210761ab590
7902.0,7902,FORD,ANALYST,7566.0,1981-12-03,3000,,20,3d31c3715fb51805c4b71b1f5cf456d5
7369.0,7369,SMITH,CLERK,7902.0,1980-12-17,800,,20,f5fc6c3de19d491f867d94856fb98f87
7499.0,7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30,40fa7d7eb2d3994624740f143adb097e
7521.0,7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30,ba596d8b173e58e4b05fab66694ec819
7654.0,7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30,be5f02c7512f42be780879e2946687e9


In [0]:
# Determine the maximum surrogate key value in the target table
emp_scd2_pyspark_df = spark.table("training.EMP_SCD2_PYSPARK")
max_sk = emp_scd2_pyspark_df.select(max(emp_scd2_pyspark_df["SK_EMPNO"]).alias("MAX_SK")).na.fill(0).collect()[0][0]
display(max_sk)
# Add row_number to new rows for surrogate key generation
window_spec = Window.orderBy("EMPNO")
stg_emp_vw = stg_emp_vw.withColumn("row_num", row_number().over(window_spec) + max_sk)
                                        
display(stg_emp_vw)


30

JOIN_KEY,EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,ETL_CHECKSUM,row_num
7369,7369,SMITH,CLERK,7902.0,1980-12-17,800,,20,f5fc6c3de19d491f867d94856fb98f87,31
7499,7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30,40fa7d7eb2d3994624740f143adb097e,32
7521,7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30,ba596d8b173e58e4b05fab66694ec819,33
7566,7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20,f696067cd364d153e154204d62598445,34
7654,7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30,be5f02c7512f42be780879e2946687e9,35
7698,7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30,3e9e79400e00fd2374eb64cbf34da1c0,36
7782,7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10,34d454b81056c09de467ed30077da5c9,37
7788,7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20,10d0a7d4ac6c6558cc7d7210761ab590,38
7839,7839,KING,PRESIDENT,,1981-11-17,5000,,10,bc1cf418724ab56313dc6c3f2e7eb34c,39
7844,7844,TURNER,SALESMAN,7698.0,1981-09-08,1500,0.0,30,28ade27449ded787abacf9f17381c58e,40


In [0]:
# Create or load the Delta table
delta_table = DeltaTable.forPath(spark,"/FileStore/tables/delta-table-merge/EMP_SCD2_PYSPARK")

delta_table.alias("Target")\
    .merge(
            source = stg_emp_vw.alias("Source"), condition = "Target.EMPNO = Source.JOIN_KEY"
          )\
    .whenMatchedUpdate(
                       condition="TARGET.ETL_CHECKSUM != SOURCE.ETL_CHECKSUM",
                        set =
                        {
                          "Target.CURRENT_FLAG" : "'N'",
                          "Target.EXPIRATION_DATE" : lit(current_timestamp())
                        }
                      )\
    .whenNotMatchedInsert(
                            values =
                            {
                            "Target.EMPNO": "Source.EMPNO",
                            "Target.ENAME": "Source.ENAME",
                            "Target.JOB": "Source.JOB",
                            "Target.MGR": "Source.MGR",
                            "Target.HIREDATE": "Source.HIREDATE",
                            "Target.SAL": "Source.SAL",
                            "Target.COMM": "Source.COMM",
                            "Target.DEPTNO": "Source.DEPTNO",
                            #"Target.SK_EMPNO": "''",
                            "Target.SK_EMPNO": "Source.row_num",  # Increment surrogate key
                            "Target.EFFECTIVE_DATE": lit(current_timestamp()),
                            "Target.EXPIRATION_DATE": "'9999-12-31'",
                            "Target.CURRENT_FLAG": "'Y'",
                            "Target.ETL_CHECKSUM": "Source.ETL_CHECKSUM",
                            }
                        )\
    .execute()


In [0]:
%sql
select * from training.EMP_SCD2_PYSPARK

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,sk_EMPNO,EFFECTIVE_DATE,EXPIRATION_DATE,CURRENT_FLAG,ETL_CHECKSUM
7369,SMITH,CLERK,7902.0,1980-12-17,800,,20,1,2024-05-18,9999-12-31,Y,f5fc6c3de19d491f867d94856fb98f87
7499,ALLEN,SALESMAN,7698.0,1981-02-20,1600,300.0,30,2,2024-05-18,9999-12-31,Y,40fa7d7eb2d3994624740f143adb097e
7521,WARD,SALESMAN,7698.0,1981-02-22,1250,500.0,30,3,2024-05-18,9999-12-31,Y,ba596d8b173e58e4b05fab66694ec819
7566,JONES,MANAGER,7839.0,1981-04-02,2975,,20,4,2024-05-18,9999-12-31,Y,f696067cd364d153e154204d62598445
7654,MARTIN,SALESMAN,7698.0,1981-09-28,1250,1400.0,30,5,2024-05-18,9999-12-31,Y,be5f02c7512f42be780879e2946687e9
7698,BLAKE,MANAGER,7839.0,1981-05-01,2850,,30,6,2024-05-18,9999-12-31,Y,3e9e79400e00fd2374eb64cbf34da1c0
7782,CLARK,MANAGER,7839.0,1981-06-09,2450,,10,7,2024-05-18,9999-12-31,Y,34d454b81056c09de467ed30077da5c9
7788,SCOTT,ANALYST,7566.0,1987-04-19,3000,,20,8,2024-05-18,9999-12-31,Y,10d0a7d4ac6c6558cc7d7210761ab590
7839,KING,PRESIDENT,,1981-11-17,5000,,10,9,2024-05-18,9999-12-31,Y,bc1cf418724ab56313dc6c3f2e7eb34c
7844,TURNER,SALESMAN,7698.0,1981-09-08,1500,0.0,30,10,2024-05-18,9999-12-31,Y,28ade27449ded787abacf9f17381c58e
