In [0]:
# Databricks notebook source
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from delta.tables import DeltaTable

In [0]:
# COMMAND ----------

# Define the schema
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("phone", DoubleType(), True),
    StructField("email", StringType(), True)
])

# Initial data for the delta table
data = [
    (1, "Alice", "HR", 60000, "New York", 1234567890, "alice@abc.com"),
    (2, "Bob", "IT", 80000, "San Francisco", 2345678901, "bob@abc.com"),
    (3, "Charlie", "Finance", 75000, "Chicago", 3456789012, "charlie@abc.com"),
    (4, "David", "Marketing", 65000, "Boston", 4567890123, "david@abc.com")
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Write to Delta Table
df.write.format("delta").mode("overwrite").saveAsTable("dim_employee_scd1")


In [0]:
%sql
select * from dim_employee_scd1

In [0]:
data2 = [ (2,"Bob", "IT", 8500, "Los Angeles", 2345678901, "bob@abc.com"),
         (3, "Charlie", "Finance", 75000, "Chicago", 3456789012, "charlie_new@abc.com"),
         (4, "David", "Sales", 62000, "Boston", 4567890123, "david@abc.com"),
         (5, "Emily", "Engineering", 90000, "Seattle", 5678901234, "emily@abc.com")]

Target_table = spark.createDataFrame(data2, schema)
Target_table.write.format("delta").mode("overwrite").saveAsTable("Target_table_scd1")


In [0]:
%sql
select * from Target_table_scd1;

In [0]:
target = DeltaTable.forName(spark, "dim_employee_scd1")
source = Target_table

In [0]:


# Perform the SCD Type 1 merge
(
    target.alias("target")
    .merge(
        source.alias("source"),
        condition="target.emp_id = source.emp_id"
    )
    .whenMatchedUpdate(set={
        "name": "source.name",
        "department": "source.department",
        "salary": "source.salary",
        "city": "source.city",
        "phone": "source.phone",
        "email": "source.email"
    })
    .whenNotMatchedInsert(values={
        "emp_id": "source.emp_id",
        "name": "source.name",
        "department": "source.department",
        "salary": "source.salary",
        "city": "source.city",
        "phone": "source.phone",
        "email": "source.email"
    })
    .execute()
)

#display(spark.sql("SELECT * FROM dim_employee_scd1 ORDER BY emp_id"))



In [0]:
display(spark.sql("SELECT * FROM dim_employee_scd1 ORDER BY emp_id"))