In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
init_load_flag = int(dbutils.widgets.get('init_load_flag'))

In [0]:
df = spark.sql("select * from databricks_cata.silver.customers")

In [0]:
if init_load_flag == 0:
    df_old = spark.sql("select DimCustomerkey as old_DimCustomerkey, customer_id as old_customer_id, create_date as old_create_date, update_date as old_update_date from databricks_cata.gold.dimcustomers")

else:
    df_old = spark.sql("select 0 old_DimCustomerkey, 0 old_customer_id, 0 old_create_date, 0 old_update_date from databricks_cata.silver.customers where 1=0")

In [0]:
df.createOrReplaceTempView("mydf")
df_old.createOrReplaceTempView("myOldDf")

In [0]:
df.createOrReplaceTempView("mydf")
df_old.createOrReplaceTempView("myOldDf")

df_join = spark.sql('''select a.customer_id, a.email, a.city, a.state, a.domain, a.type, 
                    a.FullName, b.old_customer_id, b.old_DimCustomerkey, 
                    b.old_create_date, b.old_update_date from mydf a 
                    left join myOlddf b on a.customer_id = b.old_customer_id''')

In [0]:
df_new = df_join.filter(df_join['old_DimCustomerkey'].isNull())
df_old = df_join.filter(df_join['old_DimCustomerkey'].isNotNull())

Processing old records

In [0]:
oldJoinedDf = df_old.withColumn("update_date", current_timestamp())\
               .withColumnRenamed("old_create_date", "create_date")\
               .withColumnRenamed("old_DimCustomerKey", "DimCustomerKey")\
               .withColumn("create_date", to_timestamp(col('create_date')))\
               .drop("old_customer_id", "old_update_date")

Processing New Records

In [0]:
if init_load_flag == 0:
    maxDimCustomerKeyId = spark.sql("select max(DimCustomerKey) from databricks_cata.gold.dimcustomers").collect()[0][0]

else:
    maxDimCustomerKeyId = 0

In [0]:
newJoinedDf = df_new.withColumn('update_date', current_timestamp())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('DimCustomerKey', monotonically_increasing_id()+maxDimCustomerKeyId+lit(1))\
                    .drop('old_DimCustomerKey', 'old_customer_id', 'old_create_date', 'old_update_date')

In [0]:
finalDf = oldJoinedDf.unionByName(newJoinedDf)
finalDf.display()

In [0]:
if spark.catalog.tableExists('databricks_cata.gold.DimCustomers'):

    dlt_obj = DeltaTable.forPath(spark, 's3://goldbucketkrsna/Retail_YT_Project/DimCustomers')

    dlt_obj.alias('tgt').merge(finalDf.alias('src'), "src.DimCustomerKey = tgt.DimCustomerKey")\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()

else:
    finalDf.write.format('delta').mode('overwrite')\
        .option('path', 's3://goldbucketkrsna/Retail_YT_Project/DimCustomers')\
        .saveAsTable('databricks_cata.gold.DimCustomers')

In [0]:
%sql
select * from databricks_cata.gold.dimcustomers