In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

In [0]:
df = spark.sql("select * from databricks_av_cata.silver.customers")
df.display()

### **Removing duplicates**

In [0]:
df = df.dropDuplicates(subset = ['customer_id'])

In [0]:
if init_load_flag == 0:

    df_old = spark.sql('''select dimCustomer_key,customer_id,create_date,update_date 
                       from databricks_av_cata.gold.DimCustomers''')

else:

    df_old = spark.sql('''select 0 dimCustomer_key,0 customer_id,0 create_date,0 update_date 
                       from databricks_av_cata.silver.customers where 1 = 0''')


In [0]:
df_old.display()

In [0]:

df_old = df_old.withColumnRenamed("dimCustomer_key","old_dimCustomer_key")\
               .withColumnRenamed("customer_id","old_customer_id")\
               .withColumnRenamed("create_date","old_create_date")\
               .withColumnRenamed("update_date","old_update_date")

#### **Applying join with the old records**

In [0]:
df_join = df.join(df_old,df.customer_id == df_old.old_customer_id, 'Left')
df_join.display()

**Seperatig old vs new Records**

In [0]:
df_new = df_join.filter(df_join.old_dimCustomer_key.isNull())
df_new.display()

In [0]:
df_old = df_join.filter(df_join.old_dimCustomer_key.isNotNull())

**preparing df_old**

In [0]:
df_old = df_old.drop("old_customer_id","old_update_date")

# Renameing "old_dimCustomer_key" to "dimCustomer_key" 
df_old = df_old.withColumnRenamed("old_dimCustomer_key","dimCustomer_key")

# Renameing "old_create_date" to "create_date" and converting to timestamp
df_old = df_old.withColumnRenamed("old_create_date","create_date")\
               .withColumn("create_date", to_timestamp(col("create_date")))

#Recreaing update_date with current timestamp

df_old = df_old.withColumn("update_date",current_timestamp())


**Preparing df_new**

In [0]:
df_new = df_new.drop("old_dimCustomer_key","old_customer_id","old_create_date","old_update_date")

# Renameing "old_dimCustomer_key" to "dimCustomer_key" 
#df_new = df_new.withColumnRenamed("old_dimCustomer_key","dimCustomer_key")

#Recreaing create_date and update_date with current timestamp
df_new = df_new.withColumn("create_date",current_timestamp())
df_new = df_new.withColumn("update_date",current_timestamp())


In [0]:
df_new.display()

In [0]:
df_new = df_new.withColumn("dimCustomer_key", monotonically_increasing_id()+lit(1))
df_new.display(limit = 10)

**Adding Max surrogate key**

In [0]:
if init_load_flag == 1:
  max_surrogate_key = 0
else:
  df_maxsur = spark.sql("select max(dimCustomer_key) as max_surrogate_key from databricks_av_cata.gold.DimCustomers")
  #Coverting df_maxsur to max surrogate key
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("dimCustomer_key",lit(max_surrogate_key)+col("dimCustomer_key"))\
            .select("customer_id","email","city","state","domains","full_name","dimCustomer_key","create_date","update_date")
df_new.printSchema()

**union of both old new df's**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## **SCD Type1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if (spark.catalog.tableExists("databricks_av_cata.gold.DimCustomers")):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@datalakete.dfs.core.windows.net/DimCustomers")
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.dimCustomer_key = src.dimCustomer_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:

    df_final.write.format("delta")\
            .mode("overwrite")\
            .option("path","abfss://gold@datalakete.dfs.core.windows.net/DimCustomers")\
            .saveAsTable("databricks_av_cata.gold.DimCustomers")
