In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime, date
import pyspark.sql.functions as F

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("SCD_Type_2_Implementation") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

ConnectionRefusedError: [Errno 61] Connection refused

In [21]:
print("=== Step 1: Creating Initial Sample Data ===")

# Define schema for customer data
customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("customer_name", StringType(), False),
    StructField("email", StringType(), False),
    StructField("city", StringType(), False),
    StructField("state", StringType(), False),
    StructField("phone", StringType(), False)
])

# Create initial customer data
initial_data = [
    (1, "John Smith", "john@email.com", "New York", "NY", "555-1234"),
    (2, "Jane Doe", "jane@email.com", "Los Angeles", "CA", "555-5678"),
    (3, "Bob Johnson", "bob@email.com", "Chicago", "IL", "555-9012"),
    (4, "Alice Brown", "alice@email.com", "Houston", "TX", "555-3456"),
    (5, "Charlie Wilson", "charlie@email.com", "Phoenix", "AZ", "555-7890")
]

df_initial = spark.createDataFrame(initial_data, customer_schema)
print("Initial Customer Data:")
df_initial.show()

=== Step 1: Creating Initial Sample Data ===
Initial Customer Data:


                                                                                

+-----------+--------------+-----------------+-----------+-----+--------+
|customer_id| customer_name|            email|       city|state|   phone|
+-----------+--------------+-----------------+-----------+-----+--------+
|          1|    John Smith|   john@email.com|   New York|   NY|555-1234|
|          2|      Jane Doe|   jane@email.com|Los Angeles|   CA|555-5678|
|          3|   Bob Johnson|    bob@email.com|    Chicago|   IL|555-9012|
|          4|   Alice Brown|  alice@email.com|    Houston|   TX|555-3456|
|          5|Charlie Wilson|charlie@email.com|    Phoenix|   AZ|555-7890|
+-----------+--------------+-----------------+-----------+-----+--------+



In [23]:
# Step 2: Create SCD Type 2 Dimension Table Structure
print("=== Step 2: Creating SCD Type 2 Dimension Table ===")

def create_scd2_table(source_df, business_key="customer_id"):
    """
    Convert source data to SCD Type 2 format with surrogate keys and date columns
    """
    current_date = date.today()
    current_timestamp = datetime.now()
    
    scd2_df = source_df.withColumn("surrogate_key", monotonically_increasing_id() + 1) \
                      .withColumn("effective_date", lit(current_date)) \
                      .withColumn("end_date", lit(None).cast(DateType())) \
                      .withColumn("is_current", lit(True)) \
                      .withColumn("created_date", lit(current_timestamp)) \
                      .select("surrogate_key", "*")
    
    return scd2_df

# Create initial dimension table
dim_customer = create_scd2_table(df_initial)
print("Initial Dimension Table (SCD Type 2):")
dim_customer.show(truncate=False)

=== Step 2: Creating SCD Type 2 Dimension Table ===
Initial Dimension Table (SCD Type 2):
+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+
|surrogate_key|customer_id|customer_name |email            |city       |state|phone   |surrogate_key|effective_date|end_date|is_current|created_date              |
+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+
|1            |1          |John Smith    |john@email.com   |New York   |NY   |555-1234|1            |2025-06-15    |NULL    |true      |2025-06-15 13:38:25.509564|
|2            |2          |Jane Doe      |jane@email.com   |Los Angeles|CA   |555-5678|2            |2025-06-15    |NULL    |true      |2025-06-15 13:38:25.509564|
|3            |3          |Bob Johnson   |bob@email.com    |Chicago    |IL   |555-9012|3  

In [25]:
# Step 3: Create New Data with Changes
print("=== Step 3: Creating New Data with Changes ===")

new_data = [
    (1, "John Smith", "john.smith@newemail.com", "Boston", "MA", "555-1111"),  # Email and city changed
    (2, "Jane Doe-Miller", "jane.miller@email.com", "Los Angeles", "CA", "555-5555"),  # Name and email changed
    (3, "Bob Johnson", "bob@email.com", "Chicago", "IL", "555-9012"),  # No changes
    (4, "Alice Brown-Davis", "alice.davis@email.com", "Dallas", "TX", "555-3333"),  # Name, email, city changed
    (5, "Charlie Wilson", "charlie@email.com", "Phoenix", "AZ", "555-7890"),  # No changes
    (6, "David Lee", "david@email.com", "Seattle", "WA", "555-2222")  # New customer
]

df_new = spark.createDataFrame(new_data, customer_schema)
print("New Source Data:")
df_new.show(truncate=False)

=== Step 3: Creating New Data with Changes ===
New Source Data:
+-----------+-----------------+-----------------------+-----------+-----+--------+
|customer_id|customer_name    |email                  |city       |state|phone   |
+-----------+-----------------+-----------------------+-----------+-----+--------+
|1          |John Smith       |john.smith@newemail.com|Boston     |MA   |555-1111|
|2          |Jane Doe-Miller  |jane.miller@email.com  |Los Angeles|CA   |555-5555|
|3          |Bob Johnson      |bob@email.com          |Chicago    |IL   |555-9012|
|4          |Alice Brown-Davis|alice.davis@email.com  |Dallas     |TX   |555-3333|
|5          |Charlie Wilson   |charlie@email.com      |Phoenix    |AZ   |555-7890|
|6          |David Lee        |david@email.com        |Seattle    |WA   |555-2222|
+-----------+-----------------+-----------------------+-----------+-----+--------+



In [30]:
# Step 4: Implement SCD Type 2 Logic
print("=== Step 4: Implementing SCD Type 2 Logic ===")

existing_dim = dim_customer
new_source_data = df_new

business_key="customer_id"

current_date = date.today()
current_timestamp = datetime.now()

=== Step 4: Implementing SCD Type 2 Logic ===


In [39]:

# Get current records only
current_records = existing_dim.filter(col("is_current") == True)

# Get historical records (already expired)
historical_records = existing_dim.filter(col("is_current") == False)

# Get the maximum surrogate key for new records
max_surrogate_key = existing_dim.agg(max("surrogate_key")).collect()[0][0]
print("Max Surrogate Key:", max_surrogate_key)

# Join new data with current records to identify changes
data_columns = [c for c in new_source_data.columns if c != business_key]
print("Data Columns:", data_columns)

  # Create comparison expressions for each data column
comparison_exprs = []
for col_name in data_columns:
    comparison_exprs.append(
        coalesce(current_records[col_name], lit("")) != coalesce(new_source_data[col_name], lit(""))
        )

Max Surrogate Key: 5
Data Columns: ['customer_name', 'email', 'city', 'state', 'phone']


In [40]:
comparison_exprs

[Column<'(NOT (coalesce(customer_name, ) = coalesce(customer_name, )))'>,
 Column<'(NOT (coalesce(email, ) = coalesce(email, )))'>,
 Column<'(NOT (coalesce(city, ) = coalesce(city, )))'>,
 Column<'(NOT (coalesce(state, ) = coalesce(state, )))'>,
 Column<'(NOT (coalesce(phone, ) = coalesce(phone, )))'>]

In [41]:
 # Join and identify record types
joined_df = new_source_data.alias("new") \
    .join(current_records.alias("curr"), 
          col(f"new.{business_key}") == col(f"curr.{business_key}"), "left_outer")

In [42]:
joined_df.show(truncate=False)

+-----------+-----------------+-----------------------+-----------+-----+--------+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+
|customer_id|customer_name    |email                  |city       |state|phone   |surrogate_key|customer_id|customer_name |email            |city       |state|phone   |surrogate_key|effective_date|end_date|is_current|created_date              |
+-----------+-----------------+-----------------------+-----------+-----+--------+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+
|1          |John Smith       |john.smith@newemail.com|Boston     |MA   |555-1111|1            |1          |John Smith    |john@email.com   |New York   |NY   |555-1234|1            |2025-06-15    |NULL    |true      |2025-06-15 13:38:25.509564|
|6          |David L

In [None]:
from functools import reduce

# Add change detection flag
change_condition = reduce(lambda x, y: x | y, comparison_exprs) if comparison_exprs else lit(False)

records_with_flags = joined_df.withColumn("is_new_record", col("curr.customer_id").isNull()) \
                                  .withColumn("has_changes", 
                                            when(col("curr.customer_id").isNull(), lit(False))
                                            .otherwise(change_condition)) \
                                  .withColumn("is_unchanged", 
                                            col("curr.customer_id").isNotNull() & ~change_condition)

In [47]:
records_with_flags.show(truncate=False)

+-----------+-----------------+-----------------------+-----------+-----+--------+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+-------------+-----------+------------+
|customer_id|customer_name    |email                  |city       |state|phone   |surrogate_key|customer_id|customer_name |email            |city       |state|phone   |surrogate_key|effective_date|end_date|is_current|created_date              |is_new_record|has_changes|is_unchanged|
+-----------+-----------------+-----------------------+-----------+-----+--------+-------------+-----------+--------------+-----------------+-----------+-----+--------+-------------+--------------+--------+----------+--------------------------+-------------+-----------+------------+
|1          |John Smith       |john.smith@newemail.com|Boston     |MA   |555-1111|1            |1          |John Smith    |john@email.com   |New Yor

25/06/15 14:31:33 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 926374 ms exceeds timeout 120000 ms
25/06/15 14:31:33 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/15 14:31:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$