In [0]:
from pyspark.sql.functions import *

## Create Flag Param to identify Initial/INCR

In [0]:
dbutils.widgets.text('incr_flag','0')

In [0]:
incr_flag = dbutils.widgets.get('incr_flag')

### Getting Watermark values

In [0]:
load_start_dt = spark.table("sales_catalog.gold.gold_watermark") \
          .filter("table_name = 'dim_model'") \
          .select("last_watermark") \
          .collect()[0][0]
load_end_dt = spark.sql('''select max(Date_ID)
                        from parquet.`abfss://silver@dlsaleslakehouse.dfs.core.windows.net/sales`
                     ''')\
                    .collect()[0][0]
print(f"Loading data from {load_start_dt} to {load_end_dt}")


Loading data from DT00000 to DT01245


## Creating Dimension Model

### Creating dim_model
**Fetching relative columns from Silver layer**

In [0]:
df_model_src = spark.sql(f'''select distinct Model_ID , Model_category, Model_Number
                        from parquet.`abfss://silver@dlsaleslakehouse.dfs.core.windows.net/sales`
                        WHERE Date_ID > '{load_start_dt}' AND Date_ID <= '{load_end_dt}'
                     ''')
display(df_model_src)


Model_ID,Model_category,Model_Number
Aud-M237,Aud,M237
For-M25,For,M25
Cad-M37,Cad,M37
Mah-M175,Mah,M175
Mah-M176,Mah,M176
For-M224,For,M224
Che-M44,Che,M44
Mah-M172,Mah,M172
Mar-M141,Mar,M141
Nis-M266,Nis,M266


**Creating Schema for Initial run**

In [0]:
if spark.catalog.tableExists('sales_catalog.gold.dim_model'):
    df_model_sink = spark.sql('''
                                SELECT Dim_model_key, Model_ID, Model_Category, Model_Number
                                FROM sales_catalog.gold.dim_model
                            ''')
else:
    df_model_sink = spark.sql('''
                          SELECT 1 AS Dim_model_key, Model_ID, Model_Category, Model_Number
                          FROM parquet.`abfss://silver@dlsaleslakehouse.dfs.core.windows.net/sales`
                          WHERE 1=0
                          ''')
display(df_model_sink)

Dim_model_key,Model_ID,Model_Category,Model_Number


**Filtering new records and updated records**

In [0]:
df_model_filter = df_model_src.join(df_model_sink, ['Model_ID'], 'left')\
                              .select(df_model_src['Model_ID'],df_model_src['Model_category'],df_model_src['Model_Number'],df_model_sink['Dim_model_key'])
display(df_model_filter)

Model_ID,Model_category,Model_Number,Dim_model_key
Aud-M237,Aud,M237,
For-M25,For,M25,
Cad-M37,Cad,M37,
Mah-M175,Mah,M175,
Mah-M176,Mah,M176,
For-M224,For,M224,
Che-M44,Che,M44,
Mah-M172,Mah,M172,
Mar-M141,Mar,M141,
Nis-M266,Nis,M266,


**new records**

In [0]:
df_model_new_rec = df_model_filter.filter(df_model_filter.Dim_model_key.isNull())\
                                .select(col('Model_ID'),col('Model_category'),col("Model_Number"))
display(df_model_new_rec)

Model_ID,Model_category,Model_Number
Aud-M237,Aud,M237
For-M25,For,M25
Cad-M37,Cad,M37
Mah-M175,Mah,M175
Mah-M176,Mah,M176
For-M224,For,M224
Che-M44,Che,M44
Mah-M172,Mah,M172
Mar-M141,Mar,M141
Nis-M266,Nis,M266


**updated records**

In [0]:
df_model_updated_rec = df_model_filter.filter(df_model_filter.Dim_model_key.isNotNull())
display(df_model_updated_rec)

Model_ID,Model_category,Model_Number,Dim_model_key


**Create Surrogate Keys**

In [0]:
if incr_flag == '0':
  max_val=0
else:
  max_val_df = spark.sql('''
                        SELECT max(Dim_model_key) as max_val
                        FROM sales_catalog.gold.dim_model
                      ''')
  max_val = max_val_df.collect()[0]['max_val']


In [0]:
df_model_new_rec = df_model_new_rec.withColumn('Dim_model_key', max_val + monotonically_increasing_id() + 1)
display(df_model_new_rec)

Model_ID,Model_category,Model_Number,Dim_model_key
Aud-M237,Aud,M237,1
For-M25,For,M25,2
Cad-M37,Cad,M37,3
Mah-M175,Mah,M175,4
Mah-M176,Mah,M176,5
For-M224,For,M224,6
Che-M44,Che,M44,7
Mah-M172,Mah,M172,8
Mar-M141,Mar,M141,9
Nis-M266,Nis,M266,10


**Combining new and updated records**

In [0]:
df_model = df_model_new_rec.unionByName(df_model_updated_rec)
display(df_model)

Model_ID,Model_category,Model_Number,Dim_model_key
Aud-M237,Aud,M237,1
For-M25,For,M25,2
Cad-M37,Cad,M37,3
Mah-M175,Mah,M175,4
Mah-M176,Mah,M176,5
For-M224,For,M224,6
Che-M44,Che,M44,7
Mah-M172,Mah,M172,8
Mar-M141,Mar,M141,9
Nis-M266,Nis,M266,10


**SCD Type-1 (UPSERT)**

In [0]:
from delta.tables import DeltaTable

In [0]:
# INCR run
if spark.catalog.tableExists('sales_catalog.gold.dim_model'):
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@dlsaleslakehouse.dfs.core.windows.net/dim_model')
    delta_tbl.alias('trg').merge(df_model.alias('src'), 'trg.Dim_model_key = src.Dim_model_key')\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

# initial run
else:
    df_model.write.format('delta')\
        .mode('overwrite')\
        .option('path','abfss://gold@dlsaleslakehouse.dfs.core.windows.net/dim_model')\
        .saveAsTable('sales_catalog.gold.dim_model')

In [0]:
%sql
select * from sales_catalog.gold.dim_model

Model_ID,Model_category,Model_Number,Dim_model_key
Aud-M237,Aud,M237,1
For-M25,For,M25,2
Cad-M37,Cad,M37,3
Mah-M175,Mah,M175,4
Mah-M176,Mah,M176,5
For-M224,For,M224,6
Che-M44,Che,M44,7
Mah-M172,Mah,M172,8
Mar-M141,Mar,M141,9
Nis-M266,Nis,M266,10


### Updating watermark table values

In [0]:
spark.sql(f"""
    UPDATE sales_catalog.gold.gold_watermark
    SET last_watermark = '{load_end_dt}',
        updated_at = current_timestamp()
    WHERE table_name = 'dim_model'
""")


DataFrame[num_affected_rows: bigint]