In [0]:
%run /Workspace/carsdataflow_databricks_notebooks/setup

In [0]:
silver_path = "abfss://silver@adlsforcarsdataflow.dfs.core.windows.net/Carsdata/"
gold_path = "abfss://gold@adlsforcarsdataflow.dfs.core.windows.net/Carsdata/"

### Creating Databases to store dimension tables

In [0]:
# We know the top most layer is Hive metastore, inside the metastore the next level object is Database and inside databases we store the table. Let's Create 2 databases and store the tables in them.

spark.sql("CREATE DATABASE IF NOT EXISTS carsdataflow_silver") # To store different car dimension tables
spark.sql("CREATE DATABASE IF NOT EXISTS carsdataflow_gold") # To store the one big Fact Table

### Reading the Data from the Silver Layer


In [0]:
df_src = spark.read.format("delta").load(silver_path)
df_src.display()
df_src = df_src.select("Branch_ID", "BranchName")
df_src.display()

## SCD Type 1 Implementation

### Creating Model Dimension

In [0]:
%python

# To run sql queries on top of the df I am creating a temp view as Model_view
df_src.createOrReplaceTempView("branch_view")

Branch_df = spark.sql(
    "SELECT DISTINCT Branch_ID, BranchName FROM branch_view"
)
display(Branch_df)

In [0]:
df_src = Branch_df

In [0]:
%sql
USE DATABASE carsdataflow_silver;

In [0]:
# initial we dont have any schema to insert for the first time the data into the table. so first we check whether we already have a table or need to create a table schema, so that it will help for the incremental load as well.
# To identify whether we have a table or not we use the incremental parameter that will let us know whether we have a table or not.
# if incremental status set to false that means it is initial load else table and data is incremental

dbutils.widgets.text("incremental_status", "false")
incremental_status = dbutils.widgets.get("incremental_status")  
print(incremental_status)

In [0]:
if incremental_status == "false":
    df_sink = spark.sql("select 1 as Branch_Key, Branch_ID, BranchName from branch_view where 1 = 0")
else:
    df_sink = spark.sql("select Branch_Key, Branch_ID, BranchName from carsdataflow_silver.dim_branch")

### Applying Join for the df_src & df_sink

In [0]:
%python
df_filter = df_src.join(
    df_sink, 
    df_src.Branch_ID == df_sink.Branch_ID, 
    "left"
).select(
    df_sink["Branch_Key"], 
    df_src["Branch_ID"], 
    df_src["BranchName"]
)
display(df_filter)

### Findout what are new records and old records

In [0]:
df_filter_old = df_filter.filter(df_filter.Branch_Key.isNotNull())
df_filter_old.display()
df_filter_new = df_filter.filter(df_filter.Branch_Key.isNull()).select("Branch_ID", "BranchName") # We already know Model_key will always null for the new records so selecting only Branch_Key, Branch_ID, BranchName
df_filter_new.display()

### Creating Surrogate Key

In [0]:
%python
from pyspark.sql.functions import monotonically_increasing_id

if (incremental_status == "false"):
    max_surrogate_key = 1
else:
    max_surrogate_key = spark.sql(
        "select max(Model_Key) as max_surrogate_key from carsdataflow_silver.dim_branch"
    ).collect()[0][0]

print(max_surrogate_key)

# We are creating a new surrogate key for the new records and appending it to the new records
df_filter_new = df_filter_new.withColumn(
    "Branch_Key", max_surrogate_key + monotonically_increasing_id()
)

display(df_filter_new)

### Combine both old and new filtered records and make into single dataframe

In [0]:
final_df = df_filter_new.union(df_filter_old)
display(final_df)

In [0]:
%python
from delta.tables import *

# Define the path to the Delta table
delta_table_path = f"{gold_path}/dim_branch"

# Deduplicate the source DataFrame
deduplicated_final_df = final_df.dropDuplicates(["Branch_Key"])

if spark.catalog.tableExists("carsdataflow_silver.dim_branch"):
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    delta_table.alias("t").merge(
    deduplicated_final_df.alias("s"),
    "t.Branch_ID = s.Branch_ID" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    deduplicated_final_df.write.format("delta").mode("overwrite").option(
        "path", delta_table_path
    ).saveAsTable("carsdataflow_silver.dim_branch")

In [0]:
%sql

select * from carsdataflow_silver.dim_branch;