#importing libraries

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Create Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


# Creating Dimension Model

###Fetch Relative columns

In [0]:

df_src = spark.sql('''select distinct(Branch_ID) as Branch_ID, BranchName
from parquet.`abfss://silver@carsbaggiodatalake.dfs.core.windows.net/carsales`
''')

###Dim_model Sink - Initial and Incremental( Bring Schema if Model not exists)

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_branch') :

      df_sink = spark.sql('''SELECT dim_branch_key, Branch_ID, BranchName
      FROM cars_catalog.gold.dim_branch
     ''')

else:
       df_sink = spark.sql('''SELECT 1 as dim_branch_key, Branch_ID, BranchName 
       FROM parquet.`abfss://silver@carsbaggiodatalake.dfs.core.windows.net/carsales`
       where 1=0''')


### Filtering new records and Old records

In [0]:
df_filter = df_src.join(df_sink, df_src['Branch_ID'] == df_sink['Branch_ID'], 'left').select(df_src['Branch_ID'], df_src['BranchName'], df_sink['dim_branch_key'])

###df_filer_old

In [0]:
df_filter_old = df_filter.filter(col('dim_branch_key').isNotNull())

###df_filer_new

In [0]:
df_filter_new = df_filter.filter(col('dim_branch_key').isNull()).select(df_src['Branch_ID'], df_src['BranchName'])

In [0]:
df_filter_new.display()

Branch_ID,BranchName
BR0131,Audi Motors
BR0760,Healey Motors
BR0789,Hillman Motors
BR0938,Isotta Fraschini Motors
BR1040,Lada Motors
BR1693,Saleen Motors
BR1792,Simca do Brasil Motors
BR1799,Simca do Brasil Motors
BR1955,Toyota Motors
BR1978,Turner Motors


###Create Surrogate Key

###Fetch the max Surrogate Key from existing table

In [0]:
if (incremental_flag =='0'):
    max_value = 0
else:
    max_value_df = spark.sql("SELECT MAX(dim_branch_key) FROM cars_catalog.gold.dim_branch")
    max_value = max_value_df.collect()[0][0]

###Create Surrogate Key column and ADD the max surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn("dim_branch_key", max_value + monotonically_increasing_id())

###Create Final DF - df_filter_old + df-filter-new

In [0]:
df_final = df_filter_new.union(df_filter_old)

#SCD TYPE-1(UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#Incremental RUN
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@carsbaggiodatalake.dfs.core.windows.net/dim_branch")
    delta_tbl.alias("trg").merge(df_final.alias("src"), "trg.dim_branch_key = src.dim_branch_key")\
                          .whenMatchedUpdateAll()\
                          .whenNotMatchedInsertAll()\
                          .execute()
                         
                          
    
#Initial RUN
else:
    df_final.write.format("delta")\
            .mode("overwrite")\
            .option("path", "abfss://gold@carsbaggiodatalake.dfs.core.windows.net/dim_branch")\
            .saveAsTable("cars_catalog.gold.dim_branch")

In [0]:
%sql
select * from cars_catalog.gold.dim_branch

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,0
BR0760,Healey Motors,1
BR0789,Hillman Motors,2
BR0938,Isotta Fraschini Motors,3
BR1040,Lada Motors,4
BR1693,Saleen Motors,5
BR1792,Simca do Brasil Motors,6
BR1799,Simca do Brasil Motors,7
BR1955,Toyota Motors,8
BR1978,Turner Motors,9
