In [0]:
 from pyspark.sql.functions import *
 from pyspark.sql.types import *

# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text("incremental_flag",'0')

In [0]:
incremental_flag = dbutils.widgets.get("incremental_flag")


# CREATING DIMENSION MODEL


# Fetch Relative Columns

In [0]:
df_src = spark.sql('''select DISTINCT(Model_ID) as Model_ID,model_category FROM parquet.`abfss://silver@sumitdatalake.dfs.core.windows.net/carsales`
        ''')

In [0]:
df_src.display()

Model_ID,model_category
Mah-M167,Mah
Che-M47,Che
Toy-M205,Toy
BMW-M249,BMW
Mer-M122,Mer
Hon-M215,Hon
Nis-M82,Nis
Toy-M206,Toy
Mar-M139,Mar
Ren-M207,Ren


### Now our source is ready now we need to create a surrogate key as well. So for initial load, we are initially only loading schema. 


In [0]:
# This is for initial load and incremental loads
if spark.catalog.tableExists('cars_catalog.gold.dim_model') :
    df_sink = spark.sql('''
    SELECT dim_model_key, Model_ID, model_category 
    FROM cars_catalog.gold.dim_model
    ''')

else :    
    df_sink = spark.sql('''
    SELECT 1 AS dim_model_key, Model_ID, model_category FROM parquet.`abfss://silver@sumitdatalake.dfs.core.windows.net/carsales` WHERE 1 = 0
    ''')


### Filtering new records and old records 

In [0]:
df_filter = df_src.join(df_sink, df_src['Model_ID'] == df_sink['Model_ID'], 'left').select(df_src['Model_ID'], df_src['model_category'], df_sink['dim_model_key'])

In [0]:
df_filter.display()

Model_ID,model_category,dim_model_key
Mah-M167,Mah,
Che-M47,Che,
Toy-M205,Toy,
BMW-M249,BMW,
Mer-M122,Mer,
Hon-M215,Hon,
Nis-M82,Nis,
Toy-M206,Toy,
Mar-M139,Mar,
Ren-M207,Ren,


### df_filter_old


In [0]:
df_filter_old = df_filter.filter(col('dim_model_key').isNotNull())

In [0]:
df_filter_old.display()

Model_ID,model_category,dim_model_key


### df_filter_new


In [0]:
df_filter_new = df_filter.filter(col('dim_model_key').isNull()).select(df_src['Model_ID'], df_src['model_category'])

In [0]:
df_filter_new.display()

Model_ID,model_category
Mah-M167,Mah
Che-M47,Che
Toy-M205,Toy
BMW-M249,BMW
Mer-M122,Mer
Hon-M215,Hon
Nis-M82,Nis
Toy-M206,Toy
Mar-M139,Mar
Ren-M207,Ren


## Create Surrogate Key

**Fetch the max surrogate key from existing table**

In [0]:
if(incremental_flag == '0'):
    max_value = 1 
else:
    max_value_df = spark.sql("select max(dim_model_key) from cars_catalog.gold.dim_model")
    max_value = max_value_df.collect()[0][0]+1


### Create Surrogate key column and ADD the max surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key',max_value+monotonically_increasing_id())

In [0]:
df_filter_new.display()

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10


### Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_old.union(df_filter_new)
df_final.display()

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10


## SCD TYPE -1(UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark,"abfss://gold@sumitdatalake.dfs.core.windows.net/dim_model")

    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_model_key = src.dim_model_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()


#Initial RUN
else:
    df_final.write.format('delta')\
    .mode('overwrite')\
    .option("path","abfss://gold@sumitdatalake.dfs.core.windows.net/dim_model")\
    .saveAsTable('cars_catalog.gold.dim_model')

In [0]:
%sql
select * from cars_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10
