In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# create Flag Parameter

as to know - whether its our initial run or incremental run

In [0]:
dbutils.widgets.text('incremental_flag','0')

#0 -> defaultValue, as our initial run

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)
print(type(incremental_flag)) #string type.

0
<class 'str'>


# creating Dimension Model

### Fetch relative columns

In [0]:
%sql
select * from parquet.`abfss://silver@adlscarsaless.dfs.core.windows.net/carsales`

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Day,Month,Year,BranchName,DealerName,Product_Name,model_category,RevPerUnit


In [0]:
%sql
select Distinct(Model_ID) as Model_ID, model_category
 from parquet.`abfss://silver@adlscarsaless.dfs.core.windows.net/carsales`

Model_ID,model_category


In [0]:
#create dataframe on above SQL Query:
df_source = spark.sql('''
select Distinct(Model_ID) as Model_ID, model_category
 from parquet.`abfss://silver@adlscarsaless.dfs.core.windows.net/carsales`
 ''')

In [0]:
df_source.display()

Model_ID,model_category


### dim_model Sink - Initial and Incremental
just bring the Schema if table NOT EXISTS

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    df_sink=spark.sql('''
                      select 
                      dim_model_key, Model_ID, model_category
                      from cars_catalog.gold.dim_model
                       ''')
else:
        
    #initial_run
    df_sink=spark.sql('''
                      select 
                      1 as dim_model_key, Model_ID, model_category
                      from parquet.`abfss://silver@adlscarsaless.dfs.core.windows.net/carsales`
                      where 1=0 
                      ''')


 #dim_model_key is our surrogate key
 #where 1=0 condition is always false -- as to bring the column names (schema) & not records for initial_run


### Filtering new records and old records

In [0]:
df_filter = df_source.join(
    df_sink,df_source.Model_ID == df_sink.Model_ID, 'left').select(df_source['Model_ID'],df_source['model_category'],df_sink['dim_model_key']).select(df_source['Model_ID'],df_source['model_category'],df_sink['dim_model_key'])
#or
#df_filter = df_source.join(df_sink,df_source['Model_ID'] == df_sink['Model_ID'], 'left')


In [0]:
df_filter.display()

Model_ID,model_category,dim_model_key


**df_filter_old**

In [0]:
df_filter_old = df_filter.filter(col('dim_model_key').isNotNull())

**df_filter_new**

In [0]:
df_filter_new = df_filter.filter(col('dim_model_key').isNull()).select(df_source['Model_ID'],df_source['model_category'])

In [0]:
df_filter_new.display()

Model_ID,model_category


# create Surrogate Key

### fetch the max Surrogate key from the existing table

In [0]:
#inita_run:
if incremental_flag=='0':
    max_value=1
else:
    max_value_df=spark.sql("select Max(dim_model_key) from cars_catalog.gold.dim_model")
    max_value=max_value_df.collect()[0][0]

### create Surrogate key columnn and ADD the max Surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_model_key',max_value + monotonically_increasing_id())

'''
monotonically_increasing_id(): This is a function in PySpark that generates a unique, monotonically increasing 64-bit integer for each row. The values start from 0 and increase with each row, but note that the values are not guaranteed to be contiguous, just monotonically increasing
'''

'\nmonotonically_increasing_id(): This is a function in PySpark that generates a unique, monotonically increasing 64-bit integer for each row. The values start from 0 and increase with each row, but note that the values are not guaranteed to be contiguous, just monotonically increasing\n'

In [0]:
df_filter_new.display()

Model_ID,model_category,dim_model_key


### create Final Df = df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

Model_ID,model_category,dim_model_key


## SCD Type-1 (UPSERT = update + insert)

In [0]:
from delta.tables import DeltaTable

In [0]:
#incremental_run
if spark.catalog.tableExists('cars_catalog.gold.dim_model'):
    #creating DeltaTable object on the path
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@adlscarsaless.dfs.core.windows.net/dim_model")

    delta_tbl.alias('trgt').merge(df_final.alias('src'),"trgt.dim_model_key=src.dim_model_key")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

#initial_run(will be only one time)
else:
    df_final.write.format('delta')\
        .mode('overwrite')\
        .option('path','abfss://gold@adlscarsaless.dfs.core.windows.net/dim_model')\
        .saveAsTable('cars_catalog.gold.dim_model')

In [0]:
%sql
select * from cars_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
