In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id
from delta.tables import DeltaTable

# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

# CREATING DIMENSION MODEL

### Fetch Relative Column

In [0]:
df_scr = spark.sql('''
SELECT DISTINCT(Date_ID) as Date_ID
FROM PARQUET.`abfss://silver@storageact04.dfs.core.windows.net/carsales`
'''
)

In [0]:
df_scr.display()

Date_ID
DT00029
DT00140
DT00192
DT00444
DT00475
DT00947
DT00976
DT01028
DT01099
DT00657


### dim_model Sink - Initial and Incremental(just bring the schema if table NOT EXISTS)

In [0]:
if spark.catalog.tableExists('car_catalog.gold.dim_date'):
  df_sink = spark.sql('''
    SELECT Dim_date_key, Date_ID
    FROM car_catalog.gold.dim_date
    '''
    )  
else:
    df_sink = spark.sql('''
    SELECT 1 AS Dim_date_key, Date_ID
    FROM PARQUET.`abfss://silver@storageact04.dfs.core.windows.net/carsales`
    WHERE 1=0
    '''
    )

### Filtering new records and Updating old records

In [0]:
df_filter = df_scr.join(df_sink, df_scr['Date_ID'] == df_sink['Date_ID'], 'left').select(df_scr['Date_ID'],df_sink['Dim_date_key'])

In [0]:
df_filter.display()

Date_ID,Dim_date_key
DT00029,
DT00140,
DT00192,
DT00444,
DT00475,
DT00947,
DT00976,
DT01028,
DT01099,
DT00657,


**DF Filter Old**

In [0]:
df_filter_old = df_filter.filter(col('dim_date_key').isNotNull())

In [0]:
df_filter_old.display()

Date_ID,Dim_date_key


**DF_Filter_New**

In [0]:
df_filter_new = df_filter.filter(col('dim_date_key').isNull()).select(df_filter['Date_ID'])

In [0]:
df_filter_new.display()

Date_ID
DT00029
DT00140
DT00192
DT00444
DT00475
DT00947
DT00976
DT01028
DT01099
DT00657


### Create Surrogate Key 

**Fetch the max Surrogate Key**

In [0]:
if incremental_flag ==  '0' :
    max_value = 1
else:
    max_value_df = spark.sql("SELECT max(Dim_date_key) FROM cars_catalog.gold.dim_date")
    max_value = max_value_df.collect()[0][0]+1

**Create Surrogate Key column and Add the max Surrogate key**

In [0]:
max_value

1

In [0]:
df_filter_new = df_filter_new.withColumn('Dim_date_key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Date_ID,Dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10


### Create Final Filter - df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

Date_ID,Dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10


# SCD TYPE - 1 (UPSERT)

In [0]:
#Incremental Run 
if spark.catalog.tableExists('car_catalog.gold.dim_date'):
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@storageact04.dfs.core.windows.net/dim_date')
    delta_tbl.alias('trg').merge(df_final.alias('src'), 'trg.dim_date_key = src.Dim_date_key')\
                            .whenMatchedUpdateAll()\
                            .whenNotMatchedInsertAll()\
                            .execute()
    
#Initial run
else:
    df_final.write.format('delta')\
    .mode('overwrite')\
    .option('path','abfss://gold@storageact04.dfs.core.windows.net/dim_date')\
    .saveAsTable('cars_catalog.gold.dim_date')

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_date

Date_ID,Dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10
