In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
init_load_flag=int(dbutils.widgets.get("init_load_flag"))

### **Data Reading From Source**

In [0]:
df=spark.sql('''select * from databricks_ete_cata.silver.customers_silver''')

In [0]:
df.display()

### **Surrogate Key All Values**

In [0]:
# Step 1: Add monotonically_increasing_id()
df = df.withColumn("DimCustomerKey", monotonically_increasing_id()+(1))

# Step 2: Re-sequence using row_number over raw_id
windowSpec = Window.orderBy("DimCustomerKey")
df = df.withColumn("DimCustomerKey", row_number().over(windowSpec))


In [0]:
df.display()

###**Remove Duplicates**

In [0]:
df=df.dropDuplicates(subset=["customer_id"])

In [0]:
df.display()

# **Deviding New vs Old _Record**

In [0]:
if init_load_flag==0:
    df_old=spark.sql('''select DimCustomerKey,customer_id,create_date,update_date 
                     from databricks_ete_cata.gold.DimCustomers''')
    
else:
    df_old=spark.sql('''select 0 DimCustomerKey,0 customer_id,0 create_date,0 update_date 
                     from databricks_ete_cata.silver.customers_silver where 1=0''')

In [0]:
df_old.display()

### **Renaming Column**

In [0]:
df_old=df_old.withColumnRenamed("DimCustomerKey","Old_DimCustomerKey")\
    .withColumnRenamed("customer_id","old_customer_id")\
    .withColumnRenamed("create_date","old_create_date")\
    .withColumnRenamed("update_date","old_update_date")

In [0]:
df_old.display()

### **Applying Join With Old Records**

In [0]:
df_join=df.join(df_old,df.customer_id==df_old.old_customer_id,"left")

In [0]:
display(df_join)

### **Separating Old vs New Values**

In [0]:
df_new=df_join.filter(df_join.Old_DimCustomerKey.isNull())

In [0]:
df_new.display()

In [0]:
df_old=df_join.filter(df_join.Old_DimCustomerKey.isNotNull())

In [0]:
df_old.display()

### **Preparing df_old**

In [0]:
#Dropping all the columns which are not required
df_old=df_old.drop("Old_DimCustomerKey","old_customer_id","old_update_date")  

#Renaming old_create_date to create_date
df_old=df_old.withColumnRenamed("old_create_date","create_date")
df_old=df_old.withColumn("create_date",to_timestamp("create_date"))


#Recreating the update_date column
df_old=df_old.withColumn("update_date",current_timestamp())

#Renaming Old_DimCustomerKey  to DimCustomerKey
#df_old=df_old.withColumn("Old_DimCustomerKey","DimCustomerKey")


In [0]:
df_old.display()

### **Preparing df new**

In [0]:
#Dropping columns
df_new = df_new.drop('old_customer_id', 'old_create_date', 'old_update_date','Old_DimCustomerKey')

#Recreating update_date and current_date
df_new = df_new.withColumn("create_date",current_timestamp())
df_new = df_new.withColumn("update_date",current_timestamp())

In [0]:
df_new.display()

### **Surrogate Key From 1**

In [0]:
df_new=df_new.withColumn("DimCustomerKey",monotonically_increasing_id()+lit(1))

In [0]:
df_new.display()

### **Adding Max Surrogate Key**


In [0]:
if init_load_flag == 1:
  max_surrogate_key = 0
else:
  df_maxsur = spark.sql("select max(DimCustomerKey) as max_surrogate_key from databricks_ete_cata.gold.DimCustomers")
#Converting df_maxsur to max_surrogate_key variable
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new=df_new.withColumn("DimCustomerKey",lit(max_surrogate_key)+col("DimCustomerKey"))

In [0]:
df_new.display()

### **Union of df_old and df_new**

In [0]:
df_final=df_new.unionByName(df_old)

In [0]:
df_final.display()

### **SCD Type-1**

In [0]:
if spark.catalog.tableExists("databricks_ete_cata.gold.DimCustomers"):     #without flag- if we are using this means npo need to change flag again and again
    print("Hello")
else:
    print("Yes")

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_ete_cata.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(
        spark,
        "abfss://gold@databricksadlsete.dfs.core.windows.net/DimCustomers"
    )

    dlt_obj.alias("trg").merge(
        df_final.alias("src"),
        "trg.DimCustomerKey = src.DimCustomerKey"
    ).whenMatchedUpdateAll()\
     .whenNotMatchedInsertAll()\
     .execute()
else:
    df_final.write.mode("overwrite")\
        .format("delta")\
        .option(
            "path",
            "abfss://gold@databricksadlsete.dfs.core.windows.net/DimCustomers"
        )\
        .saveAsTable("databricks_ete_cata.gold.DimCustomers")

In [0]:
%sql
select * from databricks_ete_cata.gold.dimcustomers

In [0]:
df = spark.read.format("delta").table("databricks_ete_cata.gold.DimCustomers")
df.display()
df.count()