In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from configs.paths import abfss_path, checkpoint_path

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

### **Data Reading**

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers_silver")

### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset = ['customer_id'])

### **Surrogate key - All the Values**

In [0]:
df = df.withColumn("DimCustomerKey", row_number().over(Window.orderBy("customer_id")))

### **Dividing New vs Old Records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql("select DimCustomerKey, customer_id, create_date, update_date from databricks_cata.gold.DimCustomers")
else:
    df_old = spark.sql("select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date from databricks_cata.silver.customers_silver where 1=0")

### **Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
                .withColumnRenamed("customer_id", "old_customer_id")\
                .withColumnRenamed("create_date", "old_create_date")\
                .withColumnRenamed("update_date", "old_update_date")

### **Applying Joins with the old records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')

### **Separating New vs Old records**

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerKey'].isNull())
df_old = df_join.filter(df_join['old_DimCustomerKey'].isNotNull())

### **Preparing df_old**

In [0]:
#Dropping all the columns which are not required
df_old = df_old.drop("old_customer_id", "old_DimCustomerKey", "old_update_date")

#Renaming "old_create_date" to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date")))

#Recreating update_date columnn with the current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

### **Preparing df_new**

In [0]:
#Dropping all the columns which are not required
df_new = df_new.drop("old_customer_id", "old_DimCustomerKey", "old_update_date", "old_create_date")

#Recreating update_date and create date columnn with the current timestamp
df_new = df_new.withColumn("create_date", current_timestamp())
df_new = df_new.withColumn("update_date", current_timestamp())

### **Surrogate key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", row_number().over(Window.orderBy("customer_id")))

### **Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
    #Converting df_maxsur to max_surrogate_key variable
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']


In [0]:
df_new = df_new.withColumn("DimCustomerKey", col("DimCustomerKey") + max_surrogate_key)

### **Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)
df_final.limit(2).display()

### **SCD Type 1**

In [0]:
from delta.tables import DeltaTable

dimcustomers_path = abfss_path("gold", "DimCustomers")

if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark, dimcustomers_path)
    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerKey = src.DimCustomerKey")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()
else:
    df_final.write\
            .mode("overwrite")\
            .option("path", dimcustomers_path)\
            .saveAsTable("databricks_cata.gold.DimCustomers")