In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import  *

In [0]:
# initial flag value = 1 (no table in gold schema), once we load the data to table then change flag value to 0
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

## **Data Reading From Source**

In [0]:
df = spark.sql("select * from dec25etlproject.silver.customers_silver")

In [0]:
df.display()

## **Removing Duplicates**

In [0]:
df_count = df.select('customer_id').count()
display(df_count)
#df = df.dropDuplicates(subset=['customer_id'])
#df.limit(10).display()

In [0]:
df = df.dropDuplicates(subset=['customer_id'])
df.limit(10).display()

In [0]:
# customer_id is not in order that's because I think I appended new data on top, so I want to sort it
df = df.orderBy('customer_id')
df.display()

## **Dividing New Vs Old Records**

In [0]:
# trying find out what are new records, imagening the table to create logic programatically
if init_load_flag == 0:

    df_old = spark.sql("select DimCustomerkey, customer_id, create_date, update_date from dec25etlproject.gold.DimCustomers")

else:

    df_old = spark.sql("select 0 DimCustomerskey, 0 customer_id, 0 create_date, 0 update_date from dec25etlproject.silver.customers_silver where 1=0") 
# keeping where 1 = 0, only returns the schema, to get only the column names put 0 before the column names, since gold. DimCustomers is not yet created use silver.customers_silver table to return the column names

In [0]:
df_old.display()

## **Renaming the columns of df_old **

In [0]:
df_old = df_old.withColumnRenamed('DimCustomerkey', 'old_DimCustomerkey')\
    .withColumnRenamed('customer_id', 'old_customer_id')\
    .withColumnRenamed('create_date', 'old_create_date')\
    .withColumnRenamed('update_date', 'old_update_date')

## **Applying the Join with the old Records**

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, 'left')

In [0]:
df_join.limit(10).display()

## **Seperating New vs Old Records**

In [0]:
df_new = df_join.filter(df_join.old_DimCustomerkey.isNull())

In [0]:
df_old = df_join.filter(df_join.old_DimCustomerkey.isNotNull())

## **Preparing df_old**

In [0]:
# dropping all the columns which are not required
df_old = df_old.drop('old_customer_id', 'old_update_date')

# Renaming "old_DimCustomerkey" to "DimCustomerkey"
df_old = df_old.withColumnRenamed('old_DimCustomerkey', 'DimCustomerkey') 

# Renaming "old_create_date" to "create_date"
df_old = df_old.withColumnRenamed('old_create_date', 'create_date')
#update created create_date type to timestamp()
df_old = df_old.withColumn("create_date", to_timestamp(col("create_date"))) 

# Recreating the "update_date" column with current timestamp
df_old = df_old.withColumn('update_date', current_timestamp())



In [0]:
df_old.display()

## **Preparing df_new**

In [0]:
df_new.display()

In [0]:
# dropping all the columns which are not required
df_new = df_new.drop('old_DimCustomerkey', 'old_customer_id', 'old_update_date', 'old_create_date')

# Recreating the "update_date", "create_date" columns with current timestamp

df_new = df_new.withColumn('update_date', current_timestamp())
df_new= df_new.withColumn('create_date', current_timestamp())

In [0]:
df_new.display()

## **Surrogate key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerkey", monotonically_increasing_id()+lit(1))

In [0]:
df_new.limit(10).display()

## **Adding Max Surrogate Key**

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("select max(DimCustomerkey) as max_surrogate_key from dec25etlproject.gold.DimCustomers")
    ## **Converting df_maxsur to max_surrogate_key**
    max_surrogate_key = df_maxsur.collect()[0]['max_surrogate_key']

In [0]:
df_new = df_new.withColumn("DimCustomerkey", lit(max_surrogate_key)+col("DimCustomerkey"))

In [0]:
df_new.display()

## **Union of df_old and df_new**

In [0]:
df_final = df_new.unionByName(df_old)

In [0]:
df_final.display()

## **SCD TYPE - 1**

In [0]:
# instead of flag we can check table as below also
if spark.catalog.tableExists("dec25etlproject.gold.DimCustomers"):
    print("hello")
else:
    print("yes")

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("dec25etlproject.gold.DimCustomers"):
    dlt_obj = DeltaTable.forPath(spark, "abfss://gold@sadec25etlproject.dfs.core.windows.net/DimCustomers")

    dlt_obj.alias("trg").merge(df_final.alias("src"), "trg.DimCustomerkey = src.DimCustomerkey")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

else:

    df_final.write.mode("overwrite")\
    .format("delta")\
    .option("path", "abfss://gold@sadec25etlproject.dfs.core.windows.net/DimCustomers")\
    .saveAsTable("dec25etlproject.gold.DimCustomers")





In [0]:
%sql
select * from dec25etlproject.gold.dimcustomers;