### What is SCD?
Slowly Changing Dimensions is a concept used in data warehousing to track and manage changes in dimension table data over time.

**SCD1** : Overwrites old value with new value. No history maintained.

**SCD2** : Maintains full historical records by creating new rows for changes, uses is_active, start_date, end_date

### Pyspark implementation in Databricks

In [0]:
old_data = [('101', 'Manali', 'Banglore'), ('102', 'Jignesh', 'Gurugram')]
column = ['customer_id', 'customer_name', 'city']
old_df = spark.createDataFrame(old_data, column)
display(old_df)

In [0]:
#new data or updates in the table
new_data = [('101', 'Manali', 'Gurugram'), ('102', 'Jignesh', 'Gurugram'), ('103', 'Aditya', 'Mumbai')]

new_df = spark.createDataFrame(new_data, column)
display(new_df)

### SCD Type1: Overwrite(No history)

In [0]:
#save old data as delta format
from delta.tables import *

old_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/default/tmp/dim_customers_type1')

df_customers = DeltaTable.forPath(spark, '/Volumes/workspace/default/tmp/dim_customers_type1')

In [0]:
# merge for type 1
df_customers.alias('target').merge(
    new_df.alias('source'),
    'target.customer_id=source.customer_id'
    ).whenMatchedUpdateAll()   \
    .whenNotMatchedInsertAll()  \
    .execute()

In [0]:
display(spark.read.format("delta").load("/Volumes/workspace/default/tmp/dim_customers_type1"))

### SCD Type2 : Keep History

In [0]:
# add audit fields to new

from pyspark.sql.functions import lit, current_date

df_old_scd2 = old_df.withColumn('is_Active', lit(True)) \
               .withColumn('start_date', current_date()) \
               .withColumn('end_date', lit(None).cast('Date'))


In [0]:
df_new_scd2 = new_df.withColumn('is_Active', lit(True)) \
               .withColumn('start_date', current_date()) \
               .withColumn('end_date', lit(None).cast('Date'))

In [0]:
display(df_new_scd2)

In [0]:
df_expired = df_old_scd2.alias('old').join(df_new_scd2.alias('new'), 'customer_id') \
    .filter("old.city != new.city") \
    .select("old.customer_id", "old.name", "old.city") \
    .withColumn("is_active", lit(False)) \
    .withColumn("start_date", lit("2024-01-01").cast("date")) \
    .withColumn("end_date", current_date())



In [0]:
# unchanged records
df_unchanged = df_old_scd2.alias('old').join(df_new_scd2.alias('new'), 'customer_id') \
    .filter("old.city = new.city") 

display(df_unchanged)


In [0]:
# df new record entry
df_new_only = new_df.alias('new').join(old_df.alias('old'), 'customer_id', 'leftanti') \
    .withColumn('is_Active', lit(True)) \
    .withColumn('start_date', current_date()) \
    .withColumn('end_date', lit(None).cast('Date'))

display(df_new_only)


In [0]:
# show whole df with new data as well
final_df = df_new_only.union(df_new_scd2)
display(final_df)

### Databricks SQL Impelementation

In [0]:
%sql
CREATE OR REPLACE TABLE dim_customers
(customer_id INT,
name STRING,
city STRING) USING DELTA;

INSERT INTO dim_customers VALUES (1, 'John Doe', 'New York');
INSERT INTO dim_customers VALUES (2, 'Jane Smith', 'Los Angeles');
INSERT INTO dim_customers VALUES (3, 'Mike Johnson', 'Chicago');
INSERT INTO dim_customers VALUES (4, 'Emily Davis', 'Houston');
INSERT INTO dim_customers VALUES (5, 'Daniel Brown', 'Phoenix');
INSERT INTO dim_customers VALUES (6, 'Olivia Wilson', 'Philadelphia');
INSERT INTO dim_customers VALUES (7, 'Sophia Taylor', 'San Antonio');

In [0]:
%sql
SELECT * FROM dim_customers;

In [0]:
%sql
-- Step2 - create staging view
CREATE OR REPLACE VIEW staging_view AS SELECT * FROM VALUES (1, 'John Doe', 'New York'), (2, 'Jane Smith', 'Los Angeles'), (3, 'Mike Johnson', 'Chicago'), (4, 'Emily Davis', 'Houston'), (5, 'Daniel Brown', 'Phoenix'), (6, 'Olivia Wilson', 'New Jersy'), (7, 'Sophia Taylor', 'San Antonio'), (8, 'Emma Anderson', 'Dallas'), (9, 'Ava Martinez', 'San Diego'), (10, 'Isabella Garcia', 'Austin') AS t(customer_id, name, city);


In [0]:
%sql
SELECT * FROM staging_view

#### SCD Type 1

In [0]:
%sql
MERGE INTO dim_customers AS target
USING staging_view AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
    
SELECT * FROM dim_customers

#### SCD Type 2

In [0]:
%sql
CREATE OR REPLACE TABLE dim_customers
(customer_id INT,
name STRING,
city STRING,
is_active BOOLEAN,
start_date DATE,
end_date DATE) USING DELTA;

In [0]:
%sql
INSERT INTO dim_customers VALUES (1, 'John Doe', 'New York', true, '2022-01-01', NULL);
INSERT INTO dim_customers VALUES (2, 'Jane Smith', 'Los Angeles', true, '2022-01-01', NULL);
INSERT INTO dim_customers VALUES (3, 'Mike Johnson', 'New Jersy', true, '2022-01-01', NULL);
INSERT INTO dim_customers VALUES (4, 'Emily Davis', 'Houston', true, '2022-01-01', NULL);
INSERT INTO dim_customers VALUES (5, 'Daniel Brown', 'Phoenix', true, '2022-01-01', NULL);



In [0]:
%sql
--Step2 Expire old
UPDATE dim_customers SET is_active = false, end_date = current_date() 
WHERE customer_id IN(
  SELECT d.customer_id 
  FROM staging_view s JOIN dim_customers d 
  ON s.customer_id = d.customer_id 
  WHERE s.city != d.city AND d.is_active = true
  );

In [0]:
%sql
SELECT * FROM dim_customers
    

In [0]:
%sql
-- step3- insert new
INSERT INTO dim_customers 
SELECT s.customer_id, s.name, s.city, true, current_date(), NULL 
FROM staging_view s LEFT JOIN dim_customers d 
ON s.customer_id = d.customer_id AND d.is_active = true 
WHERE d.city IS NULL OR s.city != d.city;
    
SELECT * FROM dim_customers