In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag=int(dbutils.widgets.get("init_load_flag"))

### **Data Reading from source**

In [0]:
df=spark.sql("select * from databricks_cata.silver.customers_silver")

**Removing Duplicates**

In [0]:
df=df.dropDuplicates(subset=["customer_id"])

**Surrogate keys - All the Values**

In [0]:
df=df.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))

In [0]:
df.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10


# **Dividing New vs Old Records**

In [0]:
if init_load_flag==0:
    df_old=spark.sql("select DimCustomerKey, customer_id, create_date, update_date from databricks_cata.gold.DimCustomers")
else:
    df_old=spark.sql("select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from databricks_cata.silver.customers_silver where 1=0")

In [0]:
df_old.limit(10).display()

DimCustomerKey,customer_id,create_date,update_date
1150,C00001,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
916,C00002,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
1777,C00003,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
435,C00004,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
511,C00005,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
454,C00006,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
166,C00007,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
861,C00008,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
1051,C00009,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
1561,C00010,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z


**Renaming columns of df_old**

In [0]:
df_old=df_old.withColumnRenamed("DimCustomerKey","Old_DimCustomerKey")\
    .withColumnRenamed("customer_id","Old_customer_id")\
    .withColumnRenamed("create_date","Old_create_date")\
    .withColumnRenamed("update_date","Old_update_date")

**Applying Join with old records**

In [0]:
df_join=df.join(df_old,df['customer_id']==df_old['old_customer_id'],'left')

In [0]:
df_join.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,Old_DimCustomerKey,Old_customer_id,Old_create_date,Old_update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,1,C01220,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,2,C01579,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,3,C01155,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,4,C01943,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,5,C01554,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,6,C00767,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,7,C01097,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,8,C01674,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,9,C00215,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,10,C00587,2025-09-12T20:25:23.833Z,2025-09-12T20:25:23.833Z


**Separating old vs new records**

In [0]:
df_new=df_join.filter(df_join['Old_DimCustomerKey'].isNull())

In [0]:
df_old=df_join.filter(df_join['Old_DimCustomerKey'].isNotNull())

**Preparing df_old**

In [0]:
#dropping all the columns which are not required
df_old=df_old.drop('Old_DimCustomerKey','Old_customer_id','Old_update_date')

#renaming 'Old_create_date' column to 'create_date'
df_old=df_old.withColumnRenamed("Old_create_date","create_date")
df_old=df_old.withColumn("create_date",to_timestamp(col("create_date")))

#recreating 'update_date' column with current timestamp
df_old=df_old.withColumn("update_date",current_timestamp())

In [0]:
df_old.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,2025-09-12T20:25:23.833Z,2025-09-12T20:27:42.175Z


**Preparing df_new**

In [0]:
#dropping all the columns which are not required
df_new=df_new.drop('Old_DimCustomerKey','Old_customer_id','Old_update_date','Old_create_date')

#recreating 'update_date','current_date' column with current timestamp
df_new=df_new.withColumn("update_date",current_timestamp())
df_new=df_new.withColumn("create_date",current_timestamp())

In [0]:
df_new.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date


**Surrogate key - from 1**

In [0]:
df_new=df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))

In [0]:
df_new.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date


**Adding max surrogate key**

In [0]:
if init_load_flag==1:
    max_surrogate_key=0
else:
    df_maxsur=spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
    #Converting df_maxsur to max_surrogate_key variable
    max_surrogate_key=df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new=df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))

**Union of df_old and df_new**

In [0]:
df_final=df_old.unionByName(df_new)

In [0]:
df_final.limit(10).display()

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,2025-09-12T20:25:23.833Z,2025-09-12T20:27:46.445Z


### **SCD Type - 1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if (spark.catalog.tableExists("databricks_cata.gold.DimCustomers")):
    dlt_obj=DeltaTable.forPath(spark,"abfss://gold@databricksaniete.dfs.core.windows.net/DimCustomers")
    
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey=src.DimCustomerKey")\
    .whenMatchedUpdateAll()\
    .whenNotMatchedInsertAll()\
    .execute()
else:
    df_final.write.mode("overWrite")\
    .option("path","abfss://gold@databricksaniete.dfs.core.windows.net/DimCustomers")\
    .saveAsTable("databricks_cata.gold.DimCustomers")
    

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers limit 10

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C01220,matthew01@yahoo.com,New Andrewhaven,WA,yahoo.com,Thomas Fitzgerald,1,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01579,nathancastro@gmail.com,South Amanda,FL,gmail.com,Brittany Schmidt,2,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01155,cynthia51@lewis-dixon.com,East Theresa,FL,lewis-dixon.com,Latasha Phelps,3,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01943,rodriguezzachary@hotmail.com,East Timothy,PA,hotmail.com,David Cooper,4,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01554,sthompson@harding.com,New Leefurt,ME,harding.com,Gavin Lindsey,5,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C00767,rsmith@pruitt-hodges.net,Heatherton,IL,pruitt-hodges.net,Pam Watts,6,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01097,joel22@stone-holmes.com,Davidhaven,LA,stone-holmes.com,James Martinez,7,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C01674,crystalraymond@keller.com,Hoborough,ME,keller.com,Nicole Johnston,8,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C00215,petersonthomas@yahoo.com,South Andrea,OK,yahoo.com,Danielle Huerta,9,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z
C00587,phillipsstephanie@gmail.com,New Ronaldmouth,OR,gmail.com,Michael Bailey,10,2025-09-12T20:27:48.350Z,2025-09-12T20:25:23.833Z


In [0]:
df_final.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- domains: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- DimCustomerKey: long (nullable = false)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = false)

