# Creating flag parameter

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')

# Parameter lists for all dimensions

In [0]:
# Define dimension metadata
dimensions = [
    {
        "name": "dim_model",
        "distinct_cols": "DISTINCT(Model_ID) as Model_ID, model_category",
        "sink_cols": "Model_ID, model_category",
        "struct": [
            ("dim_model_key", IntegerType(), False),
            ("Model_ID", StringType(), True),
            ("model_category", StringType(), True),
        ],
    },
    {
        "name": "dim_branch",
        "distinct_cols": "DISTINCT(Branch_ID) as Branch_ID, BranchName",
        "sink_cols": "Branch_ID, BranchName",
        "struct": [
            ("dim_branch_key", IntegerType(), False),
            ("Branch_ID", StringType(), True),
            ("BranchName", StringType(), True),
        ],
    },
    {
        "name": "dim_date",
        "distinct_cols": "DISTINCT(Date_ID) as Date_ID",
        "sink_cols": "Date_ID",
        "struct": [
            ("dim_date_key", IntegerType(), False),
            ("Date_ID", StringType(), True),
        ],
    },
    {
        "name": "dim_dealer",
        "distinct_cols": "DISTINCT(Dealer_ID) as Dealer_ID, DealerName",
        "sink_cols": "Dealer_ID, DealerName",
        "struct": [
            ("dim_dealer_key", IntegerType(), False),
            ("Dealer_ID", StringType(), True),
            ("DealerName", StringType(), True),
        ],
    },
]

# Extract elements using comprehensions
dim_names = [d["name"] for d in dimensions]

initial_dimensions_model_sql = [f"SELECT {d['distinct_cols']} FROM parquet.`abfss://silver@carsalesdatalake.dfs.core.windows.net/carsales`" for d in dimensions]

struct_list = [StructType([StructField(*field) for field in d["struct"]]) for d in dimensions]

sink_sql = [f"SELECT {d['sink_cols']} FROM parquet.`abfss://silver@carsalesdatalake.dfs.core.windows.net/{d['name']}`" for d in dimensions]

## dimension loops

In [0]:
# # Define dimension metadata
# dimensions = [
#     {
#         "name": "dim_model",
#         "distinct_cols": "DISTINCT(Model_ID) as Model_ID, model_category",
#         "sink_cols": "Model_ID, model_category",
#         "struct": [
#             ("dim_model_key", IntegerType(), False),
#             ("Model_ID", StringType(), True),
#             ("model_category", StringType(), True),
#         ],
#     }
# ]

# # Extract elements using comprehensions
# dim_names = [d["name"] for d in dimensions]

# initial_dimensions_model_sql = [f"SELECT {d['distinct_cols']} FROM parquet.`abfss://silver@carsalesdatalake.dfs.core.windows.net/carsales`" for d in dimensions]

# struct_list = [StructType([StructField(*field) for field in d["struct"]]) for d in dimensions]

# sink_sql = [f"SELECT {d['sink_cols']} FROM parquet.`abfss://silver@carsalesdatalake.dfs.core.windows.net/{d['name']}`" for d in dimensions]

In [0]:
def process_dimension(dimension: dict, incremental_flag: str):
    dim_name = dimension["name"]
    schema = StructType([StructField(*field) for field in dimension["struct"]])
    
    # Source data query
    src_query = f"""
        SELECT {dimension['distinct_cols']}
        FROM parquet.`abfss://silver@carsalesdatalake.dfs.core.windows.net/carsales`
    """
    df_src = spark.sql(src_query)
    
    # Check if sink table exists
    table_exists = spark.catalog.tableExists(f"cars_catalog.gold.{dim_name}")
    
    if not table_exists:
        df_sink = spark.createDataFrame([], schema)
    else:
        sink_path = f"abfss://gold@carsalesdatalake.dfs.core.windows.net/{dim_name}"
        sink_query = f"""
            SELECT {dimension['sink_cols']}, {dimension['struct'][0][0]} 
            FROM delta.`{sink_path}`
        """
        df_sink = spark.sql(sink_query)
    
    # Join source and sink on natural key columns
    # join on the first ID column
    key_column = dimension["struct"][1][0]  # e.g. "Model_ID"
    df_filter = df_src.join(df_sink, df_src[key_column] == df_sink[key_column], "left")\
                      .select(df_src["*"], df_sink[dimension["struct"][0][0]])  # select all source + surrogate key
    
    # Split old and new records
    surrogate_key_col = dimension["struct"][0][0]
    df_filter_old = df_filter.filter(col(surrogate_key_col).isNotNull())
    df_filter_new = df_filter.filter(col(surrogate_key_col).isNull())
    
    # Get max surrogate key or start at 1
    if incremental_flag == '0' or not table_exists:
        max_value = 1
    else:
        max_value_df = spark.sql(f"SELECT max({surrogate_key_col}) FROM cars_catalog.gold.{dim_name}")
        max_value = max_value_df.collect()[0][0] or 0
        max_value += 1
    
    # Assign surrogate key to new records
    df_filter_new = df_filter_new.withColumn(surrogate_key_col, max_value + monotonically_increasing_id())
    
    # Combine old and new
    df_final = df_filter_old.unionByName(df_filter_new)
    
    # Write to Delta table
    if table_exists:
        delta_tbl = DeltaTable.forPath(spark, f"abfss://gold@carsalesdatalake.dfs.core.windows.net/{dim_name}")
        delta_tbl.alias("tgt")\
            .merge(df_final.alias("src"), f"tgt.{surrogate_key_col} = src.{surrogate_key_col}")\
            .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
            .execute()
    else:
        df_final.write.format("delta")\
            .mode("overwrite")\
            .option("path", f"abfss://gold@carsalesdatalake.dfs.core.windows.net/{dim_name}")\
            .saveAsTable(f"cars_catalog.gold.{dim_name}")

# loop through dimensions:
for dim in dimensions:
    process_dimension(dim, incremental_flag)

In [0]:
# %sql
# SELECT * FROM cars_catalog.gold.dim_model

Model_ID,model_category,dim_model_key
Hon-M220,Hon,1
Hon-M215,Hon,2
Vol-M110,Vol,3
Vol-M260,Vol,4
BMW-M2,BMW,5
Acu-M60,Acu,6
Bui-M31,Bui,7
Hyu-M157,Hyu,8
Tat-M179,Tat,9
Lin-M29,Lin,10
