In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### create flag parameter

In [0]:
dbutils.widgets.text('incremental_flg','0')

In [0]:
incremental_flg = dbutils.widgets.get('incremental_flg')
print(incremental_flg)

## Creating Dimensions Model

### Fetch relative columns

In [0]:
df_src = spark.sql('''
select DISTINCT(Date_ID) as Date_ID from parquet.`abfss://silver@carsanjeevdatalake.dfs.core.windows.net/carsales`
''')

In [0]:
df_src.display()

### Dim Model Sink - initial and incremental (Just bring schema if table not exist)

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_date') :

   df_sink = spark.sql('''
select dim_date_key, Date_ID 
from parquet.`abfss://silver@carsanjeevdatalake.dfs.core.windows.net/carsales`
''')

else :

   df_sink = spark.sql('''
select 1 as dim_date_key, Date_ID from parquet.`abfss://silver@carsanjeevdatalake.dfs.core.windows.net/carsales` where 1=0
''')

### filtering  new records and old records

In [0]:
df_filter = df_src.join(df_sink, df_src.Date_ID == df_sink.Date_ID, 'left').select(df_src.Date_ID, df_sink.dim_date_key)

 df_filter_old

In [0]:
df_filter_old = df_filter.filter(col('dim_date_key').isNotNull())

 df_filter_new

In [0]:
df_filter_new = df_filter.filter(col('dim_date_key').isNull()).select(df_src.Date_ID)

In [0]:
df_filter_new.display()

**Create Surrogate key**

**fetch the max suggorate key from existing table**

In [0]:
if (incremental_flg == '0' ):
    max_value = 1
else:
    max_value_df = spark.sql("select max(dim_date_key) from cars_catalog.gold.dim_date")
    max_value = max_value_df.collect()[0][0]

**create suggrote key column and add the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key',max_value+monotonically_increasing_id())

In [0]:
df_filter_new.display()

### create final df = df_filter_old + DF_filter_New

In [0]:
df_final = df_filter_new.union(df_filter_old)
# no changes because as of now we have no records on old table

In [0]:
df_final.display()  

# SCD TYPE - 1 (UPSERT)

upsert means update + insert

In [0]:
from delta.tables import DeltaTable

In [0]:
# incremental run
if spark.catalog.tableExists('cars_catalog.gold.dim_date') :
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@carsanjeevdatalake.dfs.core.windows.net/dim_date')

    delta_tbl.alias('trg').merge(df_final.alias('src'), 'trg.dim_date_key = src.dim_date_key')\
         .whenMatchedUpdateAll()\
         .whenNotMatchedInsertAll()\
         .execute()

#initial run
else:
    df_final.write.format('delta')\
    .mode('overwrite')\
    .option('path', 'abfss://gold@carsanjeevdatalake.dfs.core.windows.net/dim_date')\
    .saveAsTable('cars_catalog.gold.dim_date')

In [0]:
%sql
select * from cars_catalog.gold.dim_date