In [0]:
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql.functions import current_date,lit

In [0]:
#Creating a customer_history table 

spark.sql("""
CREATE TABLE customer_history (
  cust_id INTEGER,
  first_name STRING,
  last_name STRING,
  state STRING,
  city STRING,
  address STRING,
  eff_start_date TIMESTAMP,
  eff_end_date TIMESTAMP,
  is_active string
)
USING DELTA
LOCATION '/FileStore/tables/customer_history/'""")



Out[104]: DataFrame[]

In [0]:
spark = SparkSession.builder \
    .appName("SCD2 for Customer Table with Delta Lake") \
    .getOrCreate()

In [0]:
# Read the source data
source_data = spark.read.csv("/FileStore/tables/customer_source_data-2.csv", header=True,inferSchema=True)

source_data.display()

cust_id,first_name,last_name,state,city,address
1001,John,Doe,New York,New York,123 Main St
1002,Jane,Smith,Maxico,San Francisco,456 Market St
1004,Jim,Brown,California,Los Angeles,786 Sunset Blvd


In [0]:
# Read the target data (Delta Lake table)
target_data_path = "/FileStore/tables/customer_history/"
target_delta_table = DeltaTable.forPath(spark, target_data_path)

In [0]:
# Define high_date and is_active flag
high_date = "9999-12-31"
is_active = "Y"


In [0]:
source_data_alias = source_data.alias("src")
target_delta_table_alias = target_delta_table.toDF().alias("tgt")

joined_data = source_data_alias \
    .join(target_delta_table_alias, col("src.cust_id") == col("tgt.cust_id"), "full_outer") \
    .select("src.*", 
            *[col("tgt." + c).alias("tgt_" + c) for c in target_delta_table_alias.columns])


In [0]:
joined_data.display()

cust_id,first_name,last_name,state,city,address,tgt_cust_id,tgt_first_name,tgt_last_name,tgt_state,tgt_city,tgt_address,tgt_eff_start_date,tgt_eff_end_date,tgt_is_active
1001.0,John,Doe,New York,New York,123 Main St,1001.0,John,Doe,New York,New York,123 Main St,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1002.0,Jane,Smith,Maxico,San Francisco,456 Market St,1002.0,Jane,Smith,California,San Francisco,456 Market St,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
,,,,,,1003.0,Jim,Brown,California,Los Angeles,789 Sunset Blvd,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1004.0,Jim,Brown,California,Los Angeles,786 Sunset Blvd,,,,,,,,,


In [0]:
# Determine rows to insert, update, and delete
insert_data = joined_data.filter(col("tgt_cust_id").isNull())\
              .withColumn("eff_start_date",current_date())\
              .withColumn("eff_end_date",lit(high_date))\
              .withColumn("is_active",lit("Y"))

update_data = joined_data.filter(
    col("src.cust_id").isNotNull() & col("tgt_cust_id").isNotNull() &
    (
        (col("src.first_name") != col("tgt_first_name")) |
        (col("src.last_name") != col("tgt_last_name")) |
        (col("src.state") != col("tgt_state")) |
        (col("src.city") != col("tgt_city")) |
        (col("src.address") != col("tgt_address"))
    )
)

In [0]:
update_data.display()

cust_id,first_name,last_name,state,city,address,tgt_cust_id,tgt_first_name,tgt_last_name,tgt_state,tgt_city,tgt_address,tgt_eff_start_date,tgt_eff_end_date,tgt_is_active
1002,Jane,Smith,Maxico,San Francisco,456 Market St,1002,Jane,Smith,California,San Francisco,456 Market St,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y


In [0]:
insert_data.display()

cust_id,first_name,last_name,state,city,address,tgt_cust_id,tgt_first_name,tgt_last_name,tgt_state,tgt_city,tgt_address,tgt_eff_start_date,tgt_eff_end_date,tgt_is_active,eff_start_date,eff_end_date,is_active
1004,Jim,Brown,California,Los Angeles,786 Sunset Blvd,,,,,,,,,,2023-04-15,9999-12-31,Y


In [0]:
# Apply the changes to the target Delta Lake table
if insert_data.count() > 0:
    target_delta_table.alias("tgt") \
        .merge(insert_data.alias("src"), "tgt.cust_id = src.cust_id") \
        .whenNotMatchedInsertAll() \
        .execute()

In [0]:
%sql

select * from customer_history

cust_id,first_name,last_name,state,city,address,eff_start_date,eff_end_date,is_active
1001,John,Doe,New York,New York,123 Main St,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1003,Jim,Brown,California,Los Angeles,789 Sunset Blvd,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1002,Jane,Smith,California,San Francisco,456 Market St,2023-04-15T00:00:00.000+0000,2023-04-15,N
1004,Jim,Brown,California,Los Angeles,786 Sunset Blvd,2023-04-15T00:00:00.000+0000,9999-12-31,Y


In [0]:
if update_data.count() > 0:
    # Close old records by setting eff_end_date and is_active
    target_delta_table.alias("tgt") \
        .merge(update_data.alias("src"), "tgt.cust_id = src.cust_id") \
        .whenMatchedUpdate(
            condition="tgt.is_active = 'Y'",
            set={
                "eff_end_date": lit(current_date()),
                "tgt.is_active": lit("N")
            }
        ) \
        .execute()

In [0]:
    new_records = update_data.select(
        col("src.cust_id"),
        col("src.first_name"),
        col("src.last_name"),
        col("src.state"),
        col("src.city"),
        col("src.address"),
        current_date().alias("eff_start_date"),
        lit(high_date).alias("eff_end_date"),
        lit(is_active).alias("is_active")
    )

In [0]:
    target_delta_table \
        .toDF() \
        .union(new_records) \
        .write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(target_data_path)

In [0]:
%sql

select * from customer_history

cust_id,first_name,last_name,state,city,address,eff_start_date,eff_end_date,is_active
1001,John,Doe,New York,New York,123 Main St,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1003,Jim,Brown,California,Los Angeles,789 Sunset Blvd,2023-04-15T00:00:00.000+0000,9999-12-31 00:00:00,Y
1002,Jane,Smith,California,San Francisco,456 Market St,2023-04-15T00:00:00.000+0000,2023-04-15,N
1004,Jim,Brown,California,Los Angeles,786 Sunset Blvd,2023-04-15T00:00:00.000+0000,9999-12-31,Y
1002,Jane,Smith,Maxico,San Francisco,456 Market St,2023-04-15T00:00:00.000+0000,9999-12-31,Y
