In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### Data reading from source

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers")
df.display()

In [0]:
#removing duplicates

df = df.dropDuplicates(subset=['customer_id'])
# df.display()

### **Seperating New vs Old records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql('''select DimCustomerKey, customer_id,create_date, update_date 
                       from databricks_cata.gold.dimcustomers''')
    
else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date,0 update_date 
                       from databricks_cata.silver.customers where 1=0''')

In [0]:
df_old.display()

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

**Apply join with old records**

In [0]:
df_join = df.join(df_old , df.customer_id == df_old.old_customer_id, "left")

In [0]:
display(df_join)

In [0]:
df_new = df_join.filter(df_join.old_DimCustomerKey.isNull())

In [0]:
df_old = df_join.filter(df_join.old_DimCustomerKey.isNotNull())

**preparing df_old**

In [0]:
# Dropping columns that not required
df_old = df_old.drop("old_customer_id","old_update_date")

# Renaming colum to create_date
df_old = df_old.withColumnRenamed("old_create_date", "create_date")

#Renaming column to DimCustomerKey
df_old = df_old.withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")

# Converting timestamp to date
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

# Updating current time
df_old = df_old.withColumn("update_date", current_timestamp())


In [0]:
df_old.display()


**Preparing df_new**

In [0]:
# Dropping columns that not required
df_new = df_new.drop("old_DimCustomerKey","old_customer_id","old_create_date","old_update_date") 


# Updating current time , update_date
df_new = df_new.withColumn("create_date", current_timestamp())
df_new = df_new.withColumn("update_date", current_timestamp())



In [0]:
df_new.display()


### **surrogate key**

In [0]:
# Adding new column DimCustomerKey

df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))
df_new.display()

In [0]:
# 

if init_load_flag == 1:
  max_surrogate_key = 0
else:
  max_surrogate_key = spark.sql('''select max(DimCustomerKey) as max_surrogate_key 
                                from databricks_cata.gold.DimCustomers''')
  max_surrogate_key = max_surrogate_key.collect()[0]['max_surrogate_key']

#   print(max_surrogate_key)

In [0]:
df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key)+col("DimCustomerKey"))

In [0]:
# union df_new and df_old
df_final = df_new.unionByName(df_old)


In [0]:
df_final.display()

### **SCD Type - 1**

In [0]:
from delta.tables import DeltaTable

In [0]:
# another way to find load type insted of using flag

if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
  dlt_obj = DeltaTable.forPath(spark, "abfss://gold@databrickproject1.dfs.core.windows.net//DimCustomers")

  dlt_obj.alias("old").merge(
    df_final.alias("new"), "old.DimCustomerKey = new.DimCustomerKey") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
else:

   df_final.write.mode("overwrite")\
    .option("path", "abfss://gold@databrickproject1.dfs.core.windows.net/DimCustomers")\
    .saveAsTable("databricks_cata.gold.DimCustomers")

 

In [0]:
df = spark.sql('''select * from databricks_cata.gold.DimCustomers''')
df.display()