In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

use storage credential to access data

# Create flag parameter

In [0]:
dbutils.widgets.text('Incremental_flag','0')

In [0]:
Incremental_flag = dbutils.widgets.get('Incremental_flag')


## creating dimensions

### fetching relative columns

In [0]:
df_src = spark.sql('''select distinct(Date_ID) as Date_ID from parquet.`abfss://silver@carsathvikdatalake.dfs.core.windows.net/carsales`''')

### dim_model sink initial and incremental load

In [0]:
if spark.catalog.tableExists('hive_metastore.gold.dim_date'):
    
         df_sink = spark.sql('''
                    select dim_date_key,Date_ID 
                    from hive_metastore.gold.dim_date
                    ''')
    
else :
          df_sink = spark.sql('''
                    select 1 as dim_date_key,Date_ID 
                    from parquet.`abfss://silver@carsathvikdatalake.dfs.core.windows.net/carsales` 
                    where 1=0''')
     


In [0]:
%python
df_filter = df_src.join(
    df_sink, 
    df_src.Date_ID  == df_sink.Date_ID, 
    'left'
).select(
    df_src['Date_ID'], 
    df_sink['dim_date_key']
)


In [0]:
df_filter_old = df_filter.filter(col('dim_date_key').isNotNull())

In [0]:
df_filter_new = df_filter.filter(col('dim_date_key').isNull()).select(df_src['Date_ID'])

## Adding max surrogate key

In [0]:
if (Incremental_flag == '0'):
    max_value = 1
else :
     max_value_df = spark.sql("select max(dim_date_key) from hive_metastore.gold.dim_date")
     max_value = max_value_df.collect()[0][0]

# Creating surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key', max_value+monotonically_increasing_id())

In [0]:
df_final = df_filter_old.union(df_filter_new)

# SCD type 1

In [0]:
from delta.tables import DeltaTable

In [0]:

if spark.catalog.tableExists('hive_metastore.gold.dim_date'):
        delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@carsathvikdatalake.dfs.core.windows.net/dim_date')
        delta_tbl.alias('trg').merge(df_final.alias('src'), 'trg.dim_date_key = src.dim_date_key')\
                              .whenMatchedUpdateAll()\
                              .whenNotMatchedInsertAll()\
                              .execute()
       
else :
        df_final.write.format("delta")\
              .option('path',"abfss://gold@carsathvikdatalake.dfs.core.windows.net/dim_date")\
              .mode('overwrite')\
              .saveAsTable('hive_metastore.gold.dim_date')
            

In [0]:
%sql
select * from hive_metastore.gold.dim_date

Date_ID,dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10
