### **Customer Dimensional Modeling**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("init_load_flag", "1")

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))
print(init_load_flag)

### **Data Reading From Source**

In [0]:
df = spark.sql("SELECT * FROM databricks_cata.silver.customers")
display(df)

### **Removing Duplicates**

In [0]:
df = df.dropDuplicates(subset=['customer_id'])

display(df)

**Surrogate Key: on all the values**

In [0]:
# df = df.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))
# df.limit(10).display()

**Dividing/filter New vs Old Records**

In [0]:
if init_load_flag == 0:
    df_old = spark.sql("""SELECT dimcustomerkey
                            ,customer_id
                            ,create_date
                            ,update_date
                        FROM databricks_cata.gold.dimcustomers""")
else:
    df_old = spark.sql("""SELECT 0 dimcustomerkey
                            ,0 customer_id
                            ,0 create_date
                            ,0 update_date
                        FROM databricks_cata.silver.customers WHERE 1 = 0""")



In [0]:
df_old.display()

**Renaming Columns of df_old**

In [0]:
df_old = df_old.withColumnRenamed("dimcustomerkey","old_dimcustomerkey")\
            .withColumnRenamed("customer_id","old_customer_id")\
            .withColumnRenamed("create_date","old_create_date")\
            .withColumnRenamed("update_date","old_update_date")

In [0]:
df_old.limit(10).display()

**Applying join with the Old records**

In [0]:
df_join = df.join(df_old, df['customer_id']==df_old['old_customer_id'], 'left')

In [0]:
df_join.limit(10).display()

**Seperating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join['old_dimcustomerkey'].isNull())
df_new.display()

In [0]:
df_old = df_join.filter(df_join['old_dimcustomerkey'].isNotNull())
df_old.display()

**Preparing df_old**

In [0]:
# Droping all the columns which are not required

df_old = df_old.drop('old_customer_id','old_update_date')

# Renaming "old_dimcustomerkey" column to "DimCustomerKey" and "old_create_date" column to "create_date"
df_old = df_old.withColumnRenamed("old_dimcustomerkey","DimCustomerKey")
df_old = df_old.withColumnRenamed("old_create_date","create_date")
df_old = df_old.withColumn("create_date", to_timestamp("create_date"))

# Recreating "update_date" column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())

In [0]:
df_old.limit(10).display()

**Preparing new_df**

In [0]:
# Droping all the columns which are not required

df_new = df_new.drop('old_dimcustomerkey','old_customer_id','old_update_date','old_create_date')

# Recreating "update_date", and "create_date" columns with current timestamp
df_new = df_new.withColumn("update_date", current_timestamp())
df_new = df_new.withColumn("create_date", current_timestamp())

df_new.limit(10).display()

**Surrogate Key- From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id()+lit(1))
df_new.limit(10).display()

**Adding Max Surrogate Key** 

In [0]:
# Initial load, no data is present in gold table
if init_load_flag == 1:
  max_surrogate_key = 0

# Data is already loaded in gold table
else:
  df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_cata.gold.DimCustomers")
  
  # Convert df_maxsur to max_surrogate_key variable
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key) + col("DimCustomerKey"))

In [0]:
df_new.limit(10).display()

**Union of df_new and df_old**

In [0]:
df_new.limit(10).display()

In [0]:
df_old.limit(10).display()

In [0]:
df_final = df_new.unionByName(df_old)
df_final.display()

### **SCD Type-1**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_cata.gold.DimCustomers"):
  dlt_obj = DeltaTable.forPath(spark,"abfss://gold@databricksend2end.dfs.core.windows.net/DimCustomers")

  dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey = src.DimCustomerKey")\
          .whenMatchedUpdateAll()\
          .whenNotMatchedInsertAll()\
          .execute()
  

else:
  df_final.write.mode("overwrite")\
    .format("delta")\
    .option("path","abfss://gold@databricksend2end.dfs.core.windows.net/DimCustomers")\
    .saveAsTable("databricks_cata.gold.DimCustomers")

In [0]:
%sql
SELECT * FROM databricks_cata.gold.dimcustomers