In [0]:
from pyspark.sql.utils import AnalysisException




In [0]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("catalog_name","")

In [0]:
catalog_name = dbutils.widgets.get("catalog_name")

In [0]:
try:
    df = spark.sql(f"""select  count(*) as cnt from {catalog_name}.gold.DimCustomers""")
    row_count = df.collect()[0]['cnt']
    if row_count == 0:
        init_load_flag = 1 ##full load
    else:
        init_load_flag = 0 ###incremental_load
except AnalysisException:
        init_load_flag = 1

In [0]:
customer_df = spark.sql(f"""select * from {catalog_name}.silver.customer_silver""")

In [0]:
customer_df.display()

In [0]:
customer_df = customer_df.dropDuplicates(subset=["customer_id"])

In [0]:
customer_df = customer_df.drop("_rescued_data")

In [0]:
customer_df = customer_df.withColumn("DimCustomerkey" ,monotonically_increasing_id() + lit(1))

In [0]:
customer_df.display()

##Dividing new and old records 

In [0]:
if init_load_flag ==0:
    df_old = spark.sql(f"""select DimCustomerkey,
                        customer_id,
                        create_date,
                        update_date
                        from {catalog_name}.gold.DimCustomers""")
else:
    df_old = spark.sql(f"""select 0 DimCustomerkey,
                       0 customer_id,
                       0 create_date,
                       0 update_date
                       from {catalog_name}.silver.customer_silver
                       where 1=0""")

In [0]:
df_old.show()

##Renaming the columns of df_old

In [0]:

df_old = df_old.withColumnRenamed("DimCustomerkey","old_DimCustomerkey") \
  .withColumnRenamed("customer_id","old_customer_id") \
    .withColumnRenamed("create_date","old_create_date") \
      .withColumnRenamed("update_date","old_update_date")

##applying join with old records 

In [0]:
df_join = customer_df.join(df_old, customer_df.customer_id == df_old.old_customer_id,"left")

In [0]:
df_join.display()

##separating old and new records 

In [0]:
df_new = df_join.filter(col("old_DimCustomerkey").isNull())
df_old = df_join.filter(col("old_DimCustomerkey").isNotNull())

In [0]:
df_new.display()

##dropping all the column which are not required  in df_old

In [0]:
df_old = df_old.drop("old_customer_id","old_update_date","old_DimCustomerkey")


In [0]:
df_old.show()

In [0]:
df_old = df_old.withColumnRenamed("old_create_date","create_date")

df_old= df_old.withColumn("create_date",to_timestamp("create_date"))

df_old =df_old.withColumn("update_date",current_timestamp())
    

df_old.show()

##preparing df_new

In [0]:
df_new.display()

###dropping all the columns which is not needed in df_new 



In [0]:
df_new = df_new.drop("old_DimCustomerkey", "old_customer_id", "old_create_date", "old_update_date")

##adding update_date columns

In [0]:
df_new = df_new.withColumn("create_date",current_timestamp()) \
  .withColumn("update_date",current_timestamp())

In [0]:

df_new.display()

###from here onwards the DimcustomerKey should always want to pick from what it last updated 

In [0]:
df_new = df_new.withColumn("DimCustomerkey",monotonically_increasing_id() + lit(1))
df_new.display()

##Adding max surrogate key

In [0]:

if init_load_flag == 1:
  max_surrogate_key =0
else:
  df_max_surr = spark.sql(f"""select max(DimCustomerkey) as max_surrogate_key from {catalog_name}.gold.DimCustomers""")

  max_surrogate_key = df_max_surr.collect()[0]['max_surrogate_key']

##Adding the max_surrogate_key with currently load eg if max is 2000 then 2000+1,2000+2 like wise it add on 

In [0]:
df_new = df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))

In [0]:
max_surrogate_key

##union od df_new and df_old dataframes 

In [0]:
df_new.display()

In [0]:
df_old.display()

In [0]:
df_new.printSchema()

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_old.printSchema()

In [0]:
df_final.show()

#SCD Type 1


###Upsert is related to SCD type


In [0]:
from delta.tables import DeltaTable

from pyspark.sql.utils import AnalysisException

In [0]:
path = "abfss://gold@salesstaradlsstorageacc.dfs.core.windows.net/DimCustomer"

if DeltaTable.isDeltaTable(spark,path):
    dlt_obj = DeltaTable.forPath(spark,path)
    dlt_obj.alias("trg").merge(df_final.alias("src"),"trg.DimCustomerKey=src.DimCustomerKey") \
        .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
                .execute()

else:
    df_final.write.mode("overwrite").format("delta") \
        .option("path",path) \
            .saveAsTable(f"{catalog_name}.gold.DimCustomers")