# Choose 0 For Initial Load and 1 for Incremental load

In [0]:
dbutils.widgets.text("incremental_flag", "0")

# Dimensional Model Creation for Model Table

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
source_df = spark.sql('''select distinct(Model_ID),model_category from parquet.`abfss://silver@project3sd.dfs.core.windows.net` ''')

In [0]:
source_df.display()

Model_ID,model_category
Mah-M167,Mah
Che-M47,Che
Toy-M205,Toy
BMW-M249,BMW
Mer-M122,Mer
Hon-M215,Hon
Nis-M82,Nis
Toy-M206,Toy
Mar-M139,Mar
Ren-M207,Ren


importing schema for initial laod and dimension table for incremental load

In [0]:
if spark.catalog.tableExists("sales.salesschema.dim_model"):
    df_sink = spark.sql('''select * from sales.salesschema.dim_model''')
else:
    df_sink = spark.sql('''select 1 as dim_model_key,Model_ID,model_category from parquet.`abfss://silver@project3sd.dfs.core.windows.net` where 1=0''')

In [0]:
df_sink.display()

Model_Id,model_category,dim_model_key
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Ren-M130,Ren,9
Tat-M179,Tat,10


seperating new and old ones

In [0]:
df_filter = source_df.join(df_sink,source_df.Model_ID == df_sink.Model_Id,'left').select(source_df['Model_ID'],source_df['model_category'],df_sink['dim_model_key'])

In [0]:
df_old = df_filter.filter(df_filter.dim_model_key.isNotNull())
df_old.display()

Model_ID,model_category,dim_model_key
Mah-M167,Mah,192
Che-M47,Che,139
Toy-M205,Toy,206
BMW-M249,BMW,194
Mer-M122,Mer,40
Hon-M215,Hon,2
Nis-M82,Nis,210
Toy-M206,Toy,47
Mar-M139,Mar,268
Ren-M207,Ren,246


In [0]:
df_new = df_filter.filter(df_filter.dim_model_key.isNull())
df_new.display()

Model_ID,model_category,dim_model_key


surrogate key / dimensional key value setting

In [0]:
if (dbutils.widgets.get('incremental_flag') == '0'):
    max_value = 1
else:
    max_value = spark.sql('''select max(dim_model_key) from sales.salesschema.dim_model''').collect()[0][0] + 1

In [0]:
max_value

1

In [0]:
df_new = df_new.withColumn('dim_model_key',max_value + monotonically_increasing_id())
df_new.display()

Model_ID,model_category,dim_model_key


combining old and new dataframes of dimension tables

In [0]:
df_final = df_new.union(df_old)
df_final.display()


Model_ID,model_category,dim_model_key
Mah-M167,Mah,192
Che-M47,Che,139
Toy-M205,Toy,206
BMW-M249,BMW,194
Mer-M122,Mer,40
Hon-M215,Hon,2
Nis-M82,Nis,210
Toy-M206,Toy,47
Mar-M139,Mar,268
Ren-M207,Ren,246


## slowly changing dimensions - type 1 (update + insert)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("sales.salesschema.dim_model"):
    delta_table = DeltaTable.forName(spark, "sales.salesschema.dim_model")
    delta_table.alias("maindeltatable")\
        .merge(df_final.alias("finaldf"),"maindeltatable.dim_model_key=finaldf.dim_model_key")\
        .whenNotMatchedInsertAll()\
            .whenMatchedUpdateAll()\
                .execute()
else:
    df_final.write.format("delta")\
    .mode("overwrite")\
    .option("path","abfss://gold@project3sd.dfs.core.windows.net/dim_model")\
    .saveAsTable("sales.salesschema.dim_model")

In [0]:
spark.sql("select * from sales.salesschema.dim_model").display()


Model_Id,model_category,dim_model_key
Mah-M167,Mah,192
Che-M47,Che,139
Toy-M205,Toy,206
BMW-M249,BMW,194
Mer-M122,Mer,40
Hon-M215,Hon,2
Nis-M82,Nis,210
Toy-M206,Toy,47
Mar-M139,Mar,268
Ren-M207,Ren,246
