In [0]:
#SCD Type 1 — When an existing value is updated, the record is overwritten with the new value. If it’s a new entry, it’s inserted. This approach is also known as an "upsert" (update + insert).

#🔹 Initial Load:
#We assign a surrogate key value of 0 during the initial load and perform a left join with the silver table. This ensures that all records from the source (left table) are retained. If a matching record is found, it implies an update is needed.
#
#🔹 Incremental Load:
#In the next load, if the dimension key is null, it indicates that the record doesn't exist in the target, meaning it's a new insert.


# create flag parameter
 

In [0]:
%python
from pyspark.sql.functions import *
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')


#creating dimension model

## fetch relative columns 

In [0]:

#we dont need null values so we used distinct

df_src =spark.sql('''
select distinct (Branch_ID) as Branch_ID, BranchName 
from parquet.`abfss://silver@cartejdatalake7022.dfs.core.windows.net/carsales`''')

### dim_model sink intial and incremental  

In [0]:
#just bring the schema if table is not existed 

if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):  df_sink = spark.sql(''' select dim_branch_key, Branch_ID, BranchName from parquet.`abfss://silver@cartejdatalake7022.dfs.core.windows.net/carsales`''')
else: df_sink = spark.sql('''select 1 as dim_branch_key, Branch_ID, BranchName from parquet.`abfss://silver@cartejdatalake7022.dfs.core.windows.net/carsales` where 1=0''')

### filtering new records and old records 

In [0]:
df_filter = df_src.join(df_sink, df_src.Branch_ID == df_sink.Branch_ID, 'left').select(df_src['Branch_ID'], df_src['BranchName'], df_sink['dim_branch_key'])

In [0]:
df_filter_old = df_filter.filter(df_filter.dim_branch_key.isNotNull())

In [0]:
df_filter_new = df_filter.filter(df_filter.dim_branch_key.isNull()).select('Branch_ID','BranchName')

##create Surrogate Key

In [0]:
if incremental_flag == '0':
    max_value = 1
else:
    max_value = spark.sql("select max(dim_branch_key) from cars_catalog.gold.dim_branch").collect()[0][0]

In [0]:
df_filter_new = df_filter_new.withColumn('dim_branch_key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

In [0]:
df_final = df_filter_new.union(df_filter_old)

#SCD TYPE -1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#incrtemental load
if spark.catalog.tableExists('cars_catalog.gold.dim_branch'):
    delta_table = DeltaTable.forPath(spark,"abfss://gold@cartejdatalake7022.dfs.core.windows.net/dim_branch")

    delta_table.alias("trg").merge(df_final.alias("src"),"trg.dim_branch_key == src.dim_branch_key")\
                             .whenMatchedUpdateAll()\
                             .whenNotMatchedInsertAll()\
                             .execute()   

#intial run
else:
    df_final.write.format('delta').mode('overwrite').option("path","abfss://gold@cartejdatalake7022.dfs.core.windows.net/dim_branch")\
    .saveAsTable('cars_catalog.gold.dim_branch')


In [0]:
%sql
select * from cars_catalog.gold.dim_branch