In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text("incremental_flag", '0')

In [0]:
incremental_flag = dbutils.widgets.get("incremental_flag")
#type(incremental_flag)

# CREATING DIMENSIONS MODEL

In [0]:
df_source = spark.sql("""
                      SELECT DISTINCT(Branch_ID) AS Branch_ID, Branch_Name
                      FROM parquet.`abfss://silver@storageadlsgen3.dfs.core.windows.net/carsales`
                      """)

display(df_source)

In [0]:
if not spark.catalog.tableExists("project01.gold.dim_branch"):  
    df_sink = spark.sql("""
    select 1 as dim_branch_key, Branch_ID, Branch_Name
    from parquet.`abfss://silver@storageadlsgen3.dfs.core.windows.net/carsales`
    where 1 = 0
    """)
else:
    df_sink = spark.sql("""
                        select dim_branch_key, Branch_ID, Branch_Name
                        from project01.gold.dim_branch
                        """)
    
    

In [0]:
display(df_sink)

### Filtering new records and old ones

In [0]:
df_join = df_source.join(df_sink, df_source['Branch_ID']==df_sink['Branch_ID'], 'left')\
                   .select(df_source['Branch_ID'], df_source['Branch_Name'], df_sink['dim_branch_key'])

**df_filter_old**

In [0]:
df_filter_old = df_join.filter(col('dim_branch_key').isNotNull())

**df_filter_new**

In [0]:
df_filter_new = df_join.filter(col('dim_branch_key').isNull())\
                       .select(df_source['Branch_ID'], df_source['Branch_Name'])

In [0]:
df_filter_new.display()

# Create Surrogate Key

**Fetch the max Surrogate Key from exising table**

In [0]:
if incremental_flag == '0':
    max_value = 1
else:
    max_value = spark.sql("select max(dim_branch_key) from project01.gold.dim_branch").collect()[0][0]

**Create Surrogate Key column and ADD the mx surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn("dim_branch_key", max_value + monotonically_increasing_id()+1)

In [0]:
display(df_filter_new)

### Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_old.union(df_filter_new)

In [0]:
display(df_final)

# SCP TYPE - 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('project01.gold.dim_branch'):
  delta_tb = DeltaTable.forPath(spark, 'abfss://gold@storageadlsgen3.dfs.core.windows.net/project01/gold/dim_branch')
  delta_tb.alias('target').merge(df_final.alias('source'), 'target.dim_branch_key = source.dim_branch_key')\
                          .whenMatchedUpdateAll()\
                          .whenNotMatchedInsertAll()\
                          .execute()

else:
    df_final.write\
            .mode('overwrite')\
            .format('delta')\
            .option('path', 'abfss://gold@storageadlsgen3.dfs.core.windows.net/project01/gold/dim_branch')\
            .saveAsTable('project01.gold.dim_branch')

In [0]:
%sql
select * from project01.gold.dim_branch