# CREATE FLAG PARAMETER

In [0]:
dbutils.widgets.text("incremental_flag", "0")

In [0]:
incremental_flag = dbutils.widgets.get("incremental_flag")
print(type(incremental_flag))

<class 'str'>


# CREATING DIMENSION MODEL

### Filtering new records and old records

###  Fetch Relative Column

In [0]:
df_src_dealer = spark.sql('''
    SELECT DISTINCT Dealer_ID, DealerName
    FROM parquet.`abfss://silver@datalakecarsrj.dfs.core.windows.net/carsales`
''')
df_src_dealer.display()

Dealer_ID,DealerName
DLR0095,Isuzu Motors
DLR0124,McLaren Motors
DLR0245,Jennings Ford Automobile Dealership
DLR0171,Samsung Motors
DLR0052,e.GO Mobile Motors
DLR0175,Saturn Motors
DLR0080,Hindustan Motors
DLR0263,Auto-Union Motors
DLR0066,Freightliner Motors
DLR0258,


### Create Surrogate Key 

In [0]:
if spark.catalog.tableExists("cars_catalog.gold.dim_dealer"):
    df_existing_dealer = spark.sql('''
        SELECT dealer_key, Dealer_ID, DealerName
        FROM cars_catalog.gold.dim_dealer
    ''')
else:
    df_existing_dealer = spark.sql('''
        SELECT 1 as dealer_key, Dealer_ID, DealerName
        FROM parquet.`abfss://silver@datalakecarsrj.dfs.core.windows.net/carsales`
        WHERE 1=0
    ''')


**Create surrogate key column and ADD the max surrogate key** 

In [0]:
df_joined_dealer = df_src_dealer.join(df_existing_dealer, "Dealer_ID", how="left") \
                                .select(df_src_dealer.Dealer_ID, df_src_dealer.DealerName, df_existing_dealer.dealer_key)

df_existing_only = df_joined_dealer.filter(df_joined_dealer.dealer_key.isNotNull())
df_new_only = df_joined_dealer.filter(df_joined_dealer.dealer_key.isNull()) \
                              .select("Dealer_ID", "DealerName")

### dim_branch sink - initial and incremental 

**Fetch the max surrogate key from the existing table**

In [0]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Optional incremental flag if needed
dbutils.widgets.text("incremental_flag", "0")
incremental_flag = int(dbutils.widgets.get("incremental_flag"))

if incremental_flag == 0 or not spark.catalog.tableExists("cars_catalog.gold.dim_dealer"):
    max_dealer_key = 0
else:
    max_df = spark.sql("SELECT MAX(dealer_key) as max_key FROM cars_catalog.gold.dim_dealer")
    max_dealer_key = max_df.collect()[0]["max_key"]

# Assign new surrogate keys
window_spec = Window.orderBy("Dealer_ID")
df_new_with_keys = df_new_only.withColumn(
    "dealer_key", row_number().over(window_spec) + max_dealer_key
).select("dealer_key", "Dealer_ID", "DealerName")




In [0]:
df_existing_clean = df_existing_only.select("dealer_key", "Dealer_ID", "DealerName")
df_final_dealer = df_existing_clean.union(df_new_with_keys)
df_final_dealer.display()




dealer_key,Dealer_ID,DealerName
1,DLR0001,AC Cars Motors
2,DLR0002,Acura Motors
3,DLR0003,Aixam-Mega (including Arola) Motors
4,DLR0004,Alfa Romeo Motors
5,DLR0005,Alpine Motors
6,DLR0006,Alvis Motors
7,DLR0007,"AMC, Eagle Motors"
8,DLR0008,Anadol Motors
9,DLR0009,Ariel Motors
10,DLR0010,ARO Motors


# SCD TYPE 1 - UPSERT 

In [0]:
from delta.tables import DeltaTable

gold_path_dealer = "abfss://gold@datalakecarsrj.dfs.core.windows.net/dim_dealer"

if spark.catalog.tableExists("cars_catalog.gold.dim_dealer"):
    delta_tbl = DeltaTable.forPath(spark, gold_path_dealer)
    delta_tbl.alias("trg").merge(
        df_final_dealer.alias("src"),
        "trg.dealer_key = src.dealer_key"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    df_final_dealer.write.format("delta") \
        .mode("overwrite") \
        .option("path", gold_path_dealer) \
        .saveAsTable("cars_catalog.gold.dim_dealer")




In [0]:
%sql
select * from cars_catalog.gold.dim_dealer; 

dealer_key,Dealer_ID,DealerName
1,DLR0001,AC Cars Motors
2,DLR0002,Acura Motors
3,DLR0003,Aixam-Mega (including Arola) Motors
4,DLR0004,Alfa Romeo Motors
5,DLR0005,Alpine Motors
6,DLR0006,Alvis Motors
7,DLR0007,"AMC, Eagle Motors"
8,DLR0008,Anadol Motors
9,DLR0009,Ariel Motors
10,DLR0010,ARO Motors
