# Load and Perform SCD-1 on Customers Data

This section reads the `customers_silver` table from the silver layer in Azure Databricks, displays the data, and applies Slowly Changing Dimension Type 1 (SCD-1) logic to update existing customer records with the latest information based on the `customer_id` column.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = spark.read.table("databricks_catalog.silver.customers_silver")

df.display()

In [0]:
df = df.dropDuplicates(subset=["customer_id"])

In [0]:
df = df.withColumn("DimCustomerKey", monotonically_increasing_id()+ lit(1))

df.limit(10).display()

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id, create_date, update_date from databricks_catalog.gold.DimCustomers''')
else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from databricks_catalog.silver.customers_silver where 1=0''')


In [0]:
df_old.display()

In [0]:


df_old = df_old.withColumnRenamed("DimCustomerKey", "DimCustomerKey_old")\
    .withColumnRenamed("customer_id", "customer_id_old")\
    .withColumnRenamed("create_date", "create_date_old")\
    .withColumnRenamed("update_date", "update_date_old")
df_join = df.join(df_old, df.customer_id == df_old.customer_id_old, 'left')
df_join.display()

In [0]:
df_new = df_join.filter(df_join["DimCustomerKey_old"].isNull())
df_prev = df_join.filter(df_join["DimCustomerKey_old"].isNotNull())

In [0]:
df_prev = df_prev.drop("DimCustomerKey_old", "customer_id_old", "update_date_old")
df_prev = df_prev.withColumnRenamed("create_date_old", "create_date").withColumn("create_date", to_timestamp(col("create_date")))
df_prev = df_prev.withColumn("update_date", current_timestamp())
df_prev.display()

In [0]:
df_new = df_new.drop("DimCustomerKey_old", "customer_id_old", "update_date_old", "create_date_old")
df_new = df_new.withColumn("create_date", current_timestamp())
df_new = df_new.withColumn("update_date", current_timestamp())
df_new.display()

In [0]:
if init_load_flag == 1:
  max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_catalog.gold.DimCustomers")
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new = df_new.withColumn("DimCustomerKey", col("DimCustomerKey")+ lit(max_surrogate_key))

In [0]:
df_final = df_new.unionByName(df_prev)

In [0]:
df_final.display()

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists("databricks_catalog.gold.DimCustomers"):
    df_final.write.mode("overwrite").format("delta").saveAsTable("databricks_catalog.gold.DimCustomers")
else:
    dlt_obj = DeltaTable.forPath(spark,"abfss://gold@azuresadatalake.dfs.core.windows.net/DimCustomers")
    
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                .execute()