# Create Flag Parameter

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:

incremental_flag=dbutils.widgets.get('incremental_flag')
print(incremental_flag)

# Creating Dimension Model

### Fetch Relative Columns

In [0]:
df_src=spark.sql('''
select DISTINCT(MODEL_ID) as Model_ID, model_Category
from parquet.`abfss://silver@cartejdatalake.dfs.core.windows.net/carsales`
''')

In [0]:
df_src.display()

### dim_model Sink - Initial and Incremental

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model') :
    
    df_sink = spark.sql('''
    SELECT dim_model_key,Model_ID, model_Category
    from cars_catalog.gold.dim_model
    ''')


else:

    df_sink = spark.sql('''
    SELECT 1 as dim_model_key,Model_ID, model_Category
    from parquet.`abfss://silver@cartejdatalake.dfs.core.windows.net/carsales`
    where 1=0
    ''')

### Filtering new records and old records

In [0]:
df_filter=df_src.join(df_sink,df_src['Model_ID'] == df_sink['Model_ID'],'left').select(df_src['Model_ID'],df_src['model_Category'],df_sink['dim_model_key'])

In [0]:
df_filter.display()

### def_filter_old

In [0]:
df_filter_old =df_filter.filter(col('dim_model_key').isNotNull())

In [0]:
df_filter_old.display()

In [0]:
df_filter_new =df_filter.filter(col('dim_model_key').isNull()).select(df_src['Model_Id'],df_src['model_Category'])

In [0]:
df_filter_new.display()

### Create Surrogate Key

**fetch the max surrogate key from existing table**

In [0]:
if (incremental_flag== '0'):
    max_value = 1
else:
    max_value_df=spark.sql("select max(dim_model_key) from cars_catalog.gold.dim_model")
    max_value = max_value_df.collect()[0][0]+1

**Create Surrogate key column and add the max**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key',max_value+monotonically_increasing_id())

In [0]:
df_filter_new.display()

### Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final= df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

### SCD Type-1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark,"abfss://gold@cartejdatalake.dfs.core.windows.net/dim_model")

    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_model_key = src.dim_model_key")\
                            .whenMatchedUpdateAll()\
                            .whenNotMatchedInsertAll()\
                            .execute()
    
#Initial run
else:
    df_final.write.format("delta")\
        .mode("overwrite")\
        .option("path","abfss://gold@cartejdatalake.dfs.core.windows.net/dim_model")\
        .saveAsTable("cars_catalog.gold.dim_model")

In [0]:
%sql
select * from cars_catalog.gold.dim_model