# Create flag parameter
For incremental load

In [0]:
dbutils.widgets.text("incremental_flag",'0')
incremental_flag=dbutils.widgets.get("incremental_flag")
print(incremental_flag)

0


# Creating 'branch' dimension
This includes Branch_ID and BranchName from parquet.`/mnt/silver`

**Selecting the source columns**
Select
The spark.sql is used for making it a dataframe 

In [0]:
%sql
Select * from parquet.`/mnt/silver`

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Day,Month,Year,BranchName,DealerName,Product_Name,model_category,RevenurperUnit
BR9546,DLR0060,Jee-M10,7223451,1,DT01246,28,5,2020,Premier Motors,"Fisker, Karma Motors",Jeep,Jee,7223451.0
BR9666,DLR0062,Jee-M12,22093020,3,DT01246,30,5,2020,Puma Motors,Ford Australia Motors,Jeep,Jee,7364340.0
BR9726,DLR0063,Jee-M13,22372413,3,DT01247,31,5,2020,Power Ranger Motors,Ford do Brasil Motors,Jeep,Jee,7457471.0
XYZ9726,XYZ0063,ZYXM13,22372413,3,DT01247,31,5,2020,DataFam Motors,Datafam Dealers,Surprise,ZYXM13,7457471.0


In [0]:
df_src = spark.sql('''
Select distinct Branch_ID, BranchName
from parquet.`/mnt/silver`
''')

df_src.display()

Branch_ID,BranchName
BR9666,Puma Motors
BR9726,Power Ranger Motors
BR9546,Premier Motors
XYZ9726,DataFam Motors


**Creating surrogate key**

Get only the schema for the first run as the first run will not have any data

### dim_branch sink initial and incremental

In [0]:
#spark.sql("DROP TABLE IF EXISTS spark_catalog.gold.dim_branch")

In [0]:
# this drop table command was added alter because we were getting error that there is no delta table existing in the name of gold.dim_model. This command resolved the issue 
# Also it weas not letting the sql command read the dim_branch_key (new column to be added) in the command

# spark.sql("DROP TABLE IF EXISTS spark_catalog.gold.dim_branch")
#dbutils.fs.rm("/mnt/gold/dim_branch", True)  # Delete corrupt files
# Createing gold schema in the catalog
# spark.sql("CREATE SCHEMA IF NOT EXISTS spark_catalog.gold")


DataFrame[]

In [0]:
%sql
-- describe extended spark_catalog.gold.dim_branch

In [0]:
dbutils.fs.ls("/mnt/gold/dim_branch")

[]

In [0]:
# This condition is only for initial load 
# IF condition is used to check if the model dimension table (dim_model)exists or not. If it does not exist, it will create the table with the schema.

if spark.catalog.tableExists('spark_catalog.gold.dim_branch'):
    df_sink = spark.sql('''
    Select dim_branch_key, Branch_ID, BranchName 
    from spark_catalog.gold.dim_branch

    ''')

else: 
    df_sink = spark.sql('''
    Select 1 as dim_branch_key, Branch_ID, BranchName
    from parquet.`/mnt/silver`
    where 1=0 -- this condition is always false that means it will only bring the column names or schema and not any records
    ''')
    
df_sink.display()

dim_branch_key,Branch_ID,BranchName


### Filtering new and old records for updating the old records and insering the new records




In [0]:
df_filter=df_src.join(df_sink, df_src['Branch_ID'] == df_sink['Branch_ID'], 'left').select(df_src['Branch_ID'], df_src['BranchName'], df_sink['dim_branch_key'])

df_filter.display()

Branch_ID,BranchName,dim_branch_key
BR9666,Puma Motors,
BR9726,Power Ranger Motors,
BR9546,Premier Motors,
XYZ9726,DataFam Motors,


Since all the dim_key values are NULL therefore all the records are new and we need to insert the data. Now we will make 2 more datasets df_filter_new and df_filter_old. The new one will have dim_key values starting from the existing dim_key max value and then added 1 to the value (to get the next value), for this we will sue monotonically add function.

**df_filter_old** 

Where the dim_key value is not null 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_filter_old=df_filter.filter(col('dim_branch_key').isNotNull())

In [0]:
df_filter_old.display()

Branch_ID,BranchName,dim_branch_key


It is empty as this is the initial run and we do not have any old records

**df_filter_new**

Here the dim_branch_key is null

In [0]:
df_filter_new=df_filter.filter(col('dim_branch_key').isNull())

# Since all the dim_model_key values are NULL, therefore we do not need to display it as we will be createing it in fututure 

In [0]:
df_filter_new.display()

Branch_ID,BranchName,dim_branch_key
BR9666,Puma Motors,
BR9726,Power Ranger Motors,
BR9546,Premier Motors,
XYZ9726,DataFam Motors,


In [0]:
df_filter_new=df_filter.filter(col('dim_branch_key').isNull()).select(df_src['Branch_ID'], df_src['BranchName'])

# Since all the dim_model_key values are NULL, therefore we do not need to display it as we will be createing it in fututure 

### Create surrogate key

A surrogate key is a unique identifier artificially generated by a database system to identify a record within a table, used when a natural key (derived from existing data) is not suitable or reliable for uniquely identifying an entity, typically employed in data warehousing scenarios to simplify relationships and improve performance; essentially, it's a system-created key with no inherent meaning related to the data itself. 

**Fetching the max surrogate key from existing table**

In [0]:
if (incremental_flag=='0'):
    # Setting max_value as 1 so that the surrogate key starts from 1. This is not necessary 
    max_value=1
else:
    # Getting the maximum value of dim_branchl_key from the dim_branch table
    #the collect function is used to convert the result of the query to a list of tuples. otherwise it will return a dataframe 
    max_value_df=spark.sql("select max(dim_branch_key) from spark_catalog.gold.dim_branch")
    max_value=max_value_df.collect()[0][0]+1

In [0]:
max_value

1

**Create surrogate key column and ADD max surrogate key

In [0]:
df_filter_new=df_filter_new.withColumn('dim_branch_key', max_value+monotonically_increasing_id())
df_filter_new.display()

Branch_ID,BranchName,dim_branch_key
BR9666,Puma Motors,1
BR9726,Power Ranger Motors,2
BR9546,Premier Motors,3
XYZ9726,DataFam Motors,4


## Create final_df=df_filter_old + df_filter_new

In [0]:
df_final=df_filter_old.union(df_filter_new)
df_final.display()

Branch_ID,BranchName,dim_branch_key
BR9666,Puma Motors,1
BR9726,Power Ranger Motors,2
BR9546,Premier Motors,3
XYZ9726,DataFam Motors,4


# SCD Type 1 - Upsert 

Update+Insert

we use merge statement

In [0]:
#There will be two types of run - incremental and initial. The initial run will create the dim_model table and the incremental run will update the table

from delta.tables import DeltaTable



#For incremental load
if spark.catalog.tableExists("spark_catalog.gold.dim_branch"):

    # we will create source (df_final) and destination (dim_branch). The source will bring the data and compare with the destination. We will create an object (delta table) on top of dim_branch. Apply merge on delta table. 
    delta_table=DeltaTable.forPath(spark, "/mnt/gold/dim_branch") 
    delta_table.alias('target').merge(df_final.alias('source'), 'target.dim_branch_key=source.dim_branch_key')\
                        .whenMatchedUpdateAll()\
                        .whenNotMatchedInsertAll()\
                        .execute()




#For initial load
else:
    df_final.write\
        .format('delta')\
        .mode('overwrite')\
        .option("path","/mnt/gold/dim_branch")\
        .saveAsTable("spark_catalog.gold.dim_branch") 
        #we need different location for every dimension we make
        # we can use append mode also as it will not affect as it is first run  



To check if the above code did what was expected, we can re run it as the first run is for the intial load. Since we do not have incremental load as of now, therefore for the second (incremental run ) in this case should return the same table

In [0]:
df_final.display()

Branch_ID,BranchName,dim_branch_key
BR9666,Puma Motors,1
BR9726,Power Ranger Motors,2
BR9546,Premier Motors,3
XYZ9726,DataFam Motors,4


In [0]:
spark.sql("DESCRIBE EXTENDED spark_catalog.gold.dim_branch")

DataFrame[col_name: string, data_type: string, comment: string]