In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Flag Parameter

In [None]:
dbutils.widgets.text('incremental_flag','0')

In [None]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

# Creating Dimensions

### Fetch Relative Columns for dim_date

In [None]:
df_src = spark.sql('''select distinct(Date_ID) as Date_ID from parquet.`abfss://silver@cardatalake00.dfs.core.windows.net/carsales`''')

In [None]:
df_src.display()

### date_dim SINK - Initial and Incremental (Brings only schema if table doesn't exists.)

In [None]:
if not spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    df_sink = spark.sql('''
    SELECT 1 as dim_date_key, Date_ID FROM PARQUET.`abfss://silver@cardatalake00.dfs.core.windows.net/carsales`
    WHERE 1=0''')
else:
    df_sink = spark.sql('''
    SELECT dim_date_key, Date_ID FROM cars_catalog.gold.dim_date''')


### Filtering new records and old records

In [None]:
df_filter = df_src.join(df_sink, df_src.Date_ID == df_sink.Date_ID, 'left').select(df_src.Date_ID,df_sink.dim_date_key)

**df_filter_old**

In [None]:
df_filter_old = df_filter.filter(col("dim_date_key").isNotNull())

**df_filter_new**

In [None]:
df_filter_new = df_filter.filter(col("dim_date_key").isNull()).select(df_src.Date_ID)

In [None]:
df_filter_new.display()

### Create Surrogate Key

**Fetch max surrogate key**

In [None]:
if (incremental_flag == '0'):
    max_value=1
else:
    max_value=(spark.sql("select max(dim_date_key) from cars_catalog.gold.dim_date").collect()[0][0])+1

**Upsert surrogate key column**

In [None]:
df_filter_new=df_filter_new.withColumn("dim_date_key", max_value+monotonically_increasing_id())

In [None]:
df_filter_new.display()

### Create final df - df_filter_old + df_filter_new

In [None]:
df_final = df_filter_new.union(df_filter_old)

# SCD Type-1 (or) UPSERT

In [None]:
from delta.tables import DeltaTable

In [None]:
#Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    delta_tbl= DeltaTable.forPath(spark, 'abfss://gold@cardatalake00.dfs.core.windows.net/dim_date')
    delta_tbl.alias('trg').merge(df_final.alias('src'), "trg.dim_date_key = src.dim_date_key").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

#Initial Run
else:
    df_final.write.format('delta').mode('overwrite').option('path','abfss://gold@cardatalake00.dfs.core.windows.net/dim_date').saveAsTable('cars_catalog.gold.dim_date')

In [None]:
%sql
select * from cars_catalog.gold.dim_date