# Import libraries and directories

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

layers = ['bronze','silver','gold']
storage_account = dbutils.widgets.get('storage_account')
table_name = dbutils.widgets.get('table_name')
adls = {layer: f'abfss://{layer}@{storage_account}.dfs.core.windows.net' for layer in layers}

date_id = dbutils.widgets.get('date_id')

adls_bronze = adls['bronze']
adls_silver = adls['silver']
adls_gold = adls['gold']

# Create Flag parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

# Creating dimensional model

### Fetch Relative Columns

In [0]:

df_source= spark.sql(f'''
    with CTE as(
        select 
            row_number() over (partition by Branch_ID order by Date_ID desc) as row_num,
            Branch_ID,
            BranchName
        from parquet.`{adls_silver}/car_sales/{date_id}`
    )
    select * from CTE
    where row_num = 1
''')

row_num,Branch_ID,BranchName
1,BR9726,Power Ranger Motors
1,XYZ9727,nhi


In [0]:
df_source.limit(10).display()

Branch_ID,BranchName
BR0131,Audi Motors
BR0760,Healey Motors
BR0789,Hillman Motors
BR0938,Isotta Fraschini Motors
BR1040,Lada Motors
BR1693,Saleen Motors
BR1792,Simca do Brasil Motors
BR1799,Simca do Brasil Motors
BR1955,Toyota Motors
BR1978,Turner Motors


### Dim_branch sink - Initial and incremental

In [0]:
if spark.catalog.tableExists(f'cars_catalog.gold.{table_name}'):
  df_sink = spark.sql(f'''
                            select * from cars_catalog.gold.dim_branch
                            ''')
else:
    df_sink = spark.sql(f'''
                            select 1 as dim_branch_key, Branch_ID, BranchName  
                            from parquet.`{adls_silver}/car_sales/{date_id}`
                            where 1=0 
                            ''')

### Filtering new records and old records

In [0]:
df_filter = df_source \
.join(df_sink, df_source['Branch_ID']== df_sink['Branch_ID'], 'left') \
.select(
    df_source['Branch_ID'], 
    df_source['BranchName'],
    df_sink['dim_branch_key'] 
)

**df_filter_old**

In [0]:
df_filter_old = df_filter.filter(col('dim_branch_key').isNotNull())

**df_filter_new**

In [0]:
df_filter_new = df_filter.filter(col('dim_branch_key').isNull()).select('Branch_ID', df_source['BranchName'])

### Create surrogate key

In [0]:
# Fetch the max surrogate key from existing table
if (incremental_flag == '0'):
    max_dim_key = 0
else:
    max_dim_key = spark.sql(f'select max(dim_branch_key) from cars_catalog.gold.{table_name}').collect()[0][0]


**Create surrogate key column and add the max surrogate key**

In [0]:
#Spark sql row_number

# spark.sql(f'''
# select row_number() over (order by 1) as dim_key, Model_ID, model_category  
# from parquet.`{adls_silver}/car_sales`
# ''').display()

In [0]:
#Window function
# df_filter_new = df_filer_new.withColumn('dim_model_key', max_dim_model_key + row_number().over(Window.orderBy(lit(1))))

#monotonically_increasing_id
df_filter_new = df_filter_new.withColumn('dim_branch_key',max_dim_key+1+monotonically_increasing_id())

In [0]:
df_filter_new.limit(10).display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


### Merge df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.limit(10).display()

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10


# SCD Type 1 (Upsert)

In [0]:
if spark.catalog.tableExists(f"cars_catalog.gold.{table_name}"):
    delta_tbl = DeltaTable.forPath(spark, f'{adls_gold}/dim_branch')
    delta_tbl.alias('target').merge(df_final.alias('source'), 'target.dim_branch_key = source.dim_branch_key') \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll()\
        .execute()
else:
    df_final.write.format('delta')\
    .mode('overwrite')\
    .option('path',f'{adls_gold}/dim_branch')\
    .saveAsTable('cars_catalog.gold.dim_branch')

In [0]:
%sql
select * from cars_catalog.gold.dim_branch limit(10)

Branch_ID,BranchName,dim_branch_key
BR0131,Audi Motors,1
BR0760,Healey Motors,2
BR0789,Hillman Motors,3
BR0938,Isotta Fraschini Motors,4
BR1040,Lada Motors,5
BR1693,Saleen Motors,6
BR1792,Simca do Brasil Motors,7
BR1799,Simca do Brasil Motors,8
BR1955,Toyota Motors,9
BR1978,Turner Motors,10
