In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag=int(dbutils.widgets.get("init_load_flag"))

#Data Reading

In [0]:
df =spark.sql("select * from databricks_cat.silver.customers_silver")
df.display()


customer_id,email,city,state,domains,full_name
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers


## Dividing new vs old records

In [0]:
if init_load_flag==0:
    df_old=spark.sql("""select DimCustomerKey,customer_id,create_date,update_date from databricks_cat.gold.DimCustomers""")
    
else:
    df_old=spark.sql('''select 0 DimCustomerKey, 0 customer_id,0 create_date,0 update_date from databricks_cat.silver.customers_silver where 1=0 ''')

In [0]:
df_old.display()

DimCustomerKey,customer_id,create_date,update_date
1,C00001,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
2,C00002,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
3,C00003,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
4,C00004,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
5,C00005,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
6,C00006,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
7,C00007,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
8,C00008,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
9,C00009,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z
10,C00010,2025-06-04T07:48:40.304Z,2025-06-04T07:48:40.304Z


### Renaming Columns of df_old

In [0]:
df_old=df_old.withColumnRenamed("DimCustomerKey","old_DimCustomerKey")\
    .withColumnRenamed("customer_id","old_customer_id")\
    .withColumnRenamed("create_date","old_create_date")\
    .withColumnRenamed("update_date","old_update_date")

###  Applying Join with the Old Records

In [0]:
df_join=df.join(df_old,df.customer_id==df_old.old_customer_id,"left")

# Removing Duplicates


In [0]:
df=df.dropDuplicates(subset=["customer_id"])

**Dividing Old vs New Records**

###Seperating New Vs Old Records

In [0]:
df_new=df_join.filter(df_join['old_customer_id'].isNull())
df_old=df_join.filter(df_join['old_DimCustomerKey'].isNotNull())
 

### Preparing df_old

In [0]:
%python
from pyspark.sql.functions import current_timestamp,to_timestamp,col

# Dropping all the columns which are not required
df_old = df_old.drop( "old_customer_id", "old_update_date")

# Renaming "old_DimCustomerKey" to "DimCustomerKey"
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")
# Adding update date column to the dataframe
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old=df_old.withColumn("create_date",to_timestamp(col("create_date")))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

display(df_old)

customer_id,email,city,state,domains,full_name,DimCustomerKey,create_date,update_date


## Surrogate Keys -From 1

In [0]:
df_new=df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))
df_new.display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,old_customer_id,old_create_date,old_update_date,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,,,,,1
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,,,,,2
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,,,,,3
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,,,,,4
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,,,,,5
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,,,,,6
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,,,,,7
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,,,,,8
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,,,,,9
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,,,,,10


### Preparing df_new

In [0]:
# Dropping all the columns which are not required
df_new=df_new.drop('old_DimCustomerKey','old_customer_id','old_update_date','old_create_date')

# Recreating 'update_date','current_date' columns with current timestamp

df_new=df_new.withColumn("update_date",current_timestamp())
df_new=df_new=df_new.withColumn("create_date",current_timestamp())
print(df_new.columns)
display(df_new)


['customer_id', 'email', 'city', 'state', 'domains', 'full_name', 'DimCustomerKey', 'update_date', 'create_date']


customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,2025-06-04T07:27:46.120Z,2025-06-04T07:27:46.120Z


**Adding Max Surrogate Key**

In [0]:
if init_load_flag==1:
  max_surrogate_key=0
else:
  df_maxsur=spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cat.gold.DimCustomers")
  #Converting df_maxsur into max_surrogate_key variable
  max_surrogate_key=df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new=df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))
df_new.display()

customer_id,email,city,state,domains,full_name,old_DimCustomerKey,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,,1,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,,2,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,,3,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,,4,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,,5,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,,6,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,,7,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,,8,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,,9,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,,10,2025-06-04T07:26:17.059Z,2025-06-04T07:26:17.059Z


**Union of df_old and df_new**

In [0]:
%python

df_final = df_new.unionByName(df_old)
display(df_final)

customer_id,email,city,state,domains,full_name,DimCustomerKey,update_date,create_date
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,2,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,3,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,4,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,5,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,6,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,7,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,8,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,9,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,10,2025-06-04T07:27:58.300Z,2025-06-04T07:27:58.300Z


## **SCD Type-1**

In [0]:
from delta.tables import  DeltaTable

In [0]:
%python
from delta.tables import DeltaTable

if spark.catalog.tableExists("databricks_cat.gold.DimCustomers"):
     # Create the Delta table if it does not exist
    df_final.write.format("delta").mode("overwrite").save("abfss://gold@databricksetlss.dfs.core.windows.net/DimCustomers")
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databricksetlss.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    
else:
    df_final.write.mode("overwrite").option("path", "abfss://gold@databricksetlss.dfs.core.windows.net/DimCustomers").saveAsTable("databricks_cat.gold.DimCustomers")
   

In [0]:
if init_load_flag==1:
  df_final.write.mode("overwrite").saveAsTable("databricks_cat.gold.DimCustomers")
else:
  df_final.write.mode("append")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, lit

df = df.withColumn("DimCustomerKey", monotonically_increasing_id() + lit(1))
display(df)

customer_id,email,city,state,domains,full_name,DimCustomerKey
C00001,rushjeff@ryan.org,Johnsonmouth,MS,ryan.org,Emily Mooney,1150
C00002,mccoykiara@kelly.com,Stephenfort,WY,kelly.com,Andrea Sellers,916
C00003,rebeccamiller@yahoo.com,South Stephenshire,LA,yahoo.com,Craig Hayes,1777
C00004,lawrence05@campbell.info,Chrisland,ND,campbell.info,Bryan Scott,435
C00005,carrie45@yahoo.com,East Dennistown,RI,yahoo.com,Sean Vasquez,511
C00006,traceyramos@gmail.com,North Matthew,IN,gmail.com,Kevin Mccarthy,454
C00007,scottallen@gmail.com,Joneshaven,VA,gmail.com,Amanda Doyle,166
C00008,sullivanjeremy@horton-adams.com,South Nathanfurt,CT,horton-adams.com,Paul Campos,861
C00009,dennis03@yahoo.com,Kimberlyview,MD,yahoo.com,Mary Green,1051
C00010,charles58@murillo.net,West Hector,OK,murillo.net,James Myers,1561
