### Data Reading from source 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.getArgument("init_load_flag"))

In [0]:
display(init_load_flag)

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers")

In [0]:
df.display()

### Removing duplicates

In [0]:
df = df.dropDuplicates(subset=['customer_id'])


In [0]:
df.display() 

### Dividing new vs Old Records

In [0]:
if init_load_flag == 0:
  
  df_old = spark.sql("select DimCustomerkey, customer_id, create_date, update_date from databricks_cata.gold.dimcustomer")


else:
  
  df_old = spark.sql("select 0 DimCustomerkey,0 customer_id,0 create_date,0 update_date from databricks_cata.silver.customers where 1=0")



In [0]:
df_old.display()

### Renaming the columns of df_old

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerkey", "old_DimCustomerkey")\
  .withColumnRenamed("customer_id", "old_customer_id")\
  .withColumnRenamed("create_date", "old_create_date")\
  .withColumnRenamed("update_date", "old_update_date")

In [0]:
df_old.display()

### Applying the join with old records 

In [0]:
df_join = df.join(df_old, df['customer_id'] == df_old['old_customer_id'], 'left')


In [0]:
df_join.sort("old_DimCustomerkey", asc=True).display()

### Sepaerating new vs old records

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerkey'].isNull())



In [0]:
df_old = df_join.filter(df_join['old_DimCustomerkey'].isNotNull())

### preparing df_old

In [0]:
#droping all the columns which are not required
df_old = df_old.drop('old_customer_id', 'old_update_date')

#Renaming "old_DimCustomerkey" column to "DimCustomerkey"
df_old = df_old.withColumnRenamed("old_DimCustomerkey", "DimCustomerkey")

#Renaming the "old_create_date" column to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")
df_old = df_old.withColumn("create_date",to_timestamp(col("create_date")))

#Renaming the "update_date " column with current timestamp
df_old = df_old.withColumn("update_date", current_timestamp())





In [0]:
df_old.display()

### Preparing df_new

In [0]:
df_new.display()


In [0]:
#droping all the columns which are not required
df_new = df_new.drop('old_DimCustomerkey', 'old_customer_id', 'old_update_date', 'old_create_date', 'current_date')

#RECREATING "UPDATE_DATE", "current_date" columns with timestamp
df_new = df_new.withColumn("create_date", current_timestamp())
df_new = df_new.withColumn("update_date", current_timestamp())

 





In [0]:
df_new.display()


### create a sarogate key - from 1


In [0]:
from pyspark.sql.functions import monotonically_increasing_id, lit

df_new = df_new.withColumn("DimCustomerkey", monotonically_increasing_id() + lit(1))

In [0]:
df_new.display()

### Adding max surrogate key

In [0]:
if init_load_flag == 1:
  max_surrogate_key = 0
else:
  df_maxsur = spark.sql("select max(DimCustomerkey) as max_surrogate_key from databricks_cata.gold.DimCustomer")
  #converting df_maxsur to max_surrogate_key 
  max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']
  

In [0]:
df_new = df_new.withColumn("DimCustomerkey", lit(max_surrogate_key) + col("DimCustomerkey"))


In [0]:
df_new.display()

In [0]:
from pyspark.sql.functions import max

df_max = df_new.agg(max("DimCustomerkey").alias("max_DimCustomerkey"))
display(df_max)

### union of df_old & df_new

In [0]:

df_final = df_new.unionByName(df_old)



In [0]:
df_old.display()
df_new.display() 

In [0]:
df_new.display()

### SCD Tpe 1


In [0]:
if spark.catalog.tableExists("databricks_cata.gold.DimCustomer"):
  print("Table exists")

else:
  print("Table does not exist")


In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("databricks_cata.gold.DimCustomer"):
  dlt_obj = DeltaTable.forPath(spark, "abfss://gold@rtgpadlsgen2.dfs.core.windows.net/Dimcustomers")

  dlt_obj.alias("target").merge(df_final.alias("Source"), "target.DimCustomerkey = Source.DimCustomerkey")\
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()

else:
   df_final.write.mode("overwrite").format("delta").option("path", "abfss://gold@rtgpadlsgen2.dfs.core.windows.net/Dimcustomers").saveAsTable("databricks_cata.gold.DimCustomer")
 


In [0]:
%sql
select max(DimCustomerkey) from databricks_cata.gold.dimcustomer


In [0]:
%sql
select * from databricks_cata.gold.dimcustomer