In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag=dbutils.widgets.get('incremental_flag')

# Create Dimensional Model

### Fetch Relative Columns

In [0]:
df_src=spark.sql('''
      SELECT DISTINCT(Model_ID) as Model_ID, model_category
      FROM parquet.`abfss://silver@mayankdl.dfs.core.windows.net/carsales`
      ''')

In [0]:
display(df_src)

Model_ID,model_category
Mah-M167,Mah
Che-M47,Che
Toy-M205,Toy
BMW-M249,BMW
Mer-M122,Mer
Hon-M215,Hon
Nis-M82,Nis
Toy-M206,Toy
Mar-M139,Mar
Ren-M207,Ren


### dim_model_sink- Initial and Incremental

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):

        df_sink=spark.sql('''
            SELECT dim_model_key, Model_ID, model_category
            FROM cars_catalog.gold.dim_model
        ''')

else:

        df_sink=spark.sql('''
            SELECT 1 AS dim_model_key, Model_ID, model_category
            FROM parquet.`abfss://silver@mayankdl.dfs.core.windows.net/carsales`
            WHERE 1=0
        ''')

### Filtering Records(Old Records- updated, New Records- inserted)

In [0]:
df_filter=df_src.join(df_sink, df_src.Model_ID==df_sink.Model_ID, 'left').select(df_src.Model_ID,df_src.model_category,df_sink.dim_model_key)

### df_filter_old

In [0]:
df_filter_old=df_filter.filter(col('dim_model_key').isNotNull())

### df_filter_new

In [0]:
df_filter_new=df_filter.filter(col('dim_model_key').isNull()).select(df_src['Model_ID'],df_src['model_category'])

In [0]:
display(df_filter_new)

Model_ID,model_category
Mah-M167,Mah
Che-M47,Che
Toy-M205,Toy
BMW-M249,BMW
Mer-M122,Mer
Hon-M215,Hon
Nis-M82,Nis
Toy-M206,Toy
Mar-M139,Mar
Ren-M207,Ren


# Create Surrogate Key

### Fetching Max Surrogate Key from existing table

In [0]:
if (incremental_flag=='0'):
    max_value=1
else:
    max_value_df = spark.sql("SELECT MAX(dim_model_key) FROM cars_catalog.gold.dim_model")    
    max_value=max_value_df.collect()[0][0] + 1

### Create surrogate key and add the max surrogate

In [0]:
df_filter_new=df_filter_new.withColumn('dim_model_key',max_value + monotonically_increasing_id())
display(df_filter_new)

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10


### Combining both Old and New

In [0]:
df_final= df_filter_new.union(df_filter_old)
display(df_final)

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10


# SCD Type-1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#Incremental RUN
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    delta_tbl=DeltaTable.forPath(spark, "abfss://gold@mayankdl.dfs.core.windows.net/dim_model")

    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_model_key = src.dim_model_key")\
                          .whenMatchedUpdateAll()\
                          .whenNotMatchedInsertAll()\
                          .execute()

else: #For INITIAL RUN
    df_final.write.format("delta")\
        .mode('overwrite')\
        .option("path","abfss://gold@mayankdl.dfs.core.windows.net/dim_model")\
        .saveAsTable("cars_catalog.gold.dim_model")

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
Mah-M167,Mah,1
Che-M47,Che,2
Toy-M205,Toy,3
BMW-M249,BMW,4
Mer-M122,Mer,5
Hon-M215,Hon,6
Nis-M82,Nis,7
Toy-M206,Toy,8
Mar-M139,Mar,9
Ren-M207,Ren,10
