In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## scd type1 upsert

## create source table

In [0]:
%sql
create table if not exists scd.source.scd1table
(
  customer_id int,
  customer_name string,
  customer_city string,
  customer_state string,
  processed_date date
)

## insert values in source table

In [0]:
%sql
insert into scd.source.scd1table
values

(1,"suresh","chennai","TN",current_date()),
(2,"kumar","covai","TN",current_date()),
(3,"vel","ooty","TN",current_date()),
(4,"murugan","salem","TN",current_date())

num_affected_rows,num_inserted_rows
4,4


In [0]:
%sql
select * from scd.source.scd1table

customer_id,customer_name,customer_city,customer_state,processed_date
1,suresh,chennai,TN,2025-07-19
2,kumar,covai,TN,2025-07-19
3,vel,ooty,TN,2025-07-19
4,murugan,salem,TN,2025-07-19


## target table creation

In [0]:
%sql
create table scd.target.scd1table
(
  customer_id int,
  customer_name string,
  customer_city string,
  customer_state string,
  processed_date date
)

## create view

In [0]:
spark.sql("""select * from scd.source.scd1table""").createOrReplaceTempView("src")

In [0]:
%sql
select * from src

customer_id,customer_name,customer_city,customer_state,processed_date
1,suresh,chennai,TN,2025-07-19
2,kumar,covai,TN,2025-07-19
3,vel,ooty,TN,2025-07-19
4,murugan,salem,TN,2025-07-19


## merge(upsert)

In [0]:
%sql
merge into scd.target.scd1table as trg
using src
on src.customer_id = trg.customer_id
when matched and src.processed_date >= trg.processed_date then update set *
when not matched then insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
5,5,0,0


In [0]:
%sql
select * from scd.target.scd1table

customer_id,customer_name,customer_city,customer_state,processed_date
1,suresh,chennai,TN,2025-07-19
4,murugan,salem,TN,2025-07-19
3,vel,ooty,TN,2025-07-19
2,kumar,covai,TN,2025-07-19


## update

In [0]:
%sql
update scd.source.scd1table
set customer_name = "updated suresh kumar"
where customer_id = 1

num_affected_rows
1


In [0]:
%sql
select * from scd.target.scd1table

customer_id,customer_name,customer_city,customer_state,processed_date
1,updated suresh kumar,chennai,TN,2025-07-19
4,murugan,salem,TN,2025-07-19
3,vel,ooty,TN,2025-07-19
2,kumar,covai,TN,2025-07-19


## insert

In [0]:
%sql
insert into scd.source.scd1table
values
(5,"durai","kallakurichi","TN",current_date())

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from scd.target.scd1table

customer_id,customer_name,customer_city,customer_state,processed_date
1,updated suresh kumar,chennai,TN,2025-07-19
4,murugan,salem,TN,2025-07-19
3,vel,ooty,TN,2025-07-19
2,kumar,covai,TN,2025-07-19
5,durai,kallakurichi,TN,2025-07-19


## SCD type 2 keep history of data

## source table creation

In [0]:
%sql
create table if not exists scd.source.scd2table
(
  customer_id int,
  customer_name string,
  customer_city string,
  customer_state string,
  processed_date date
)

## insert data in source table

In [0]:
%sql
insert into scd.source.scd2table
values
(1,"suresh kumar","chennai","TN",current_date()),
(2,"ramesh","madurai","TN",current_date()),
(3,"rajesh","coimbatore","TN",current_date()),
(4,"kumar","tiruppur","TN",current_date())

num_affected_rows,num_inserted_rows
4,4


In [0]:
%sql
select * from scd.source.scd2table

customer_id,customer_name,customer_city,customer_state,processed_date
1,suresh kumar,chennai,TN,2025-07-19
2,ramesh,madurai,TN,2025-07-19
3,rajesh,coimbatore,TN,2025-07-19
4,kumar,tiruppur,TN,2025-07-19


## target table creation 

In [0]:
%sql
create table if not exists scd.target.scd2table
(
  customer_id int,
  customer_name string,
  customer_city string,
  customer_state string,
  processed_date date,
  start_date date,
  end_date date,
  is_used string
)

- **Adding additional 3 columns for SCD Type - 2**

In [0]:
spark.sql("""select * ,
          current_timestamp() as start_date,
          cast('2030-12-01' as timestamp) as end_date,
          'Yes' as is_used
          from scd.source.scd2table""").createOrReplaceTempView("src")

In [0]:
%sql
select * from src

customer_id,customer_name,customer_city,customer_state,processed_date,start_date,end_date,is_used
1,suresh kumar,chennai,TN,2025-07-19,2025-07-19T08:34:07.716Z,2030-12-01T00:00:00.000Z,Yes
2,ramesh,madurai,TN,2025-07-19,2025-07-19T08:34:07.716Z,2030-12-01T00:00:00.000Z,Yes
3,rajesh,coimbatore,TN,2025-07-19,2025-07-19T08:34:07.716Z,2030-12-01T00:00:00.000Z,Yes
4,kumar,tiruppur,TN,2025-07-19,2025-07-19T08:34:07.716Z,2030-12-01T00:00:00.000Z,Yes


%md
### **SCD TYPE 2 MEGE CONDISTIONS**

**MERGE-1** : This command will check if we have any data in the target table that is updated in the source, and will mark it as expired. 

In [0]:
%sql
merge into scd.target.scd2table as trg
using src
on src.customer_id = trg.customer_id and
trg.is_used = "yes"

when matched and (
src.customer_name <> trg.customer_name or
src.customer_city <> trg.customer_city or
src.customer_state <> trg.customer_state or
src.processed_date <> trg.processed_date
)then
update set 
trg.end_date = current_timestamp(),
trg.is_used = "no"

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


**MERGE-2** : This command will bring all the non-expired commands bcz we have filter of "is_current = 'Y'". So, this will not bring the updated records as well bcz previous MERGE command marked it as expired. So all the new records [including updated] will be inserted in this MERGE.

In [0]:
%sql
merge into scd.target.scd2table as trg
using src
on trg.customer_id = src.customer_id and
trg.is_used = "yes"


when not matched then insert
(
  customer_id,
  customer_name,
  customer_city,
  customer_state,
  processed_date,
  start_date,
  end_date,
  is_used

)
values
(
  src.customer_id,
  src.customer_name,
  src.customer_city,
  src.customer_state,
  src.processed_date,
  src.start_date,
  src.end_date,
  src.is_used
)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
4,0,0,4


In [0]:
%sql
select * from scd.target.scd2table

customer_id,customer_name,customer_city,customer_state,processed_date,start_date,end_date,is_used
1,suresh kumar,chennai,TN,2025-07-19,2025-07-19,2030-12-01,Yes
2,ramesh,madurai,TN,2025-07-19,2025-07-19,2030-12-01,Yes
3,rajesh,coimbatore,TN,2025-07-19,2025-07-19,2030-12-01,Yes
4,kumar,tiruppur,TN,2025-07-19,2025-07-19,2030-12-01,Yes
1,updated name,chennai,TN,2025-07-19,2025-07-19,2030-12-01,Yes
2,ramesh,madurai,TN,2025-07-19,2025-07-19,2030-12-01,Yes
3,rajesh,coimbatore,TN,2025-07-19,2025-07-19,2030-12-01,Yes
4,kumar,tiruppur,TN,2025-07-19,2025-07-19,2030-12-01,Yes


## update

In [0]:
%sql
update scd.source.scd2table
set customer_name = "updated name"
where customer_id = 1

num_affected_rows
1
