Define the Table List

In [0]:
# List of tables and their primary key columns
tables = [
    {"name": "media_customer_reviews", "key": "franchiseID", "path": "/FileStore/tables/media_customer_reviews.parquet"},
    {"name": "media_gold_reviews_chunked", "key": "franchiseID", "path": "/FileStore/tables/media_gold_reviews_chunked.parquet"},
    {"name": "sales_customers", "key": "customerID", "path": "/FileStore/tables/sales_customers.parquet"},
    {"name": "sales_franchises", "key": "franchiseID", "path": "/FileStore/tables/sales_franchises.parquet"},
    {"name": "sales_suppliers", "key": "supplierID", "path": "/FileStore/tables/sales_suppliers.parquet"},
    {"name": "sales_transactions", "key": "transactionID", "path": "/FileStore/tables/sales_transactions.parquet"}
]

Create Schemas - Bronze, Silver & Gold

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

Dynamic Bronze -> Silver -> Gold ETL Function

In [0]:
def run_etl_pipeline(table_name: str, key: str, file_path: str):
    bronze = f"bronze.{table_name}"
    silver = f"silver.{table_name}"
    
    # Step 1: Read raw file
    df = spark.read.parquet(file_path)
    
    # Step 2: Write as bronze table
    df.write.format("delta").mode("overwrite").saveAsTable(bronze)
    
    # Step 3: Create/overwrite silver table
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver)
    
    # Step 4: Generate SCD Type 1 MERGE SQL
    columns = df.columns
    update_expr = ", ".join([f"target.{c} = source.{c}" for c in columns])
    insert_cols = ", ".join(columns)
    insert_vals = ", ".join([f"source.{c}" for c in columns])
    
    merge_sql = f"""
    MERGE INTO {silver} AS target
    USING {bronze} AS source
    ON target.{key} = source.{key}
    WHEN MATCHED THEN UPDATE SET {update_expr}
    WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals})
    """
    
    # Step 5: Execute merge
    spark.sql(merge_sql)
    
    # Step 6: Create a gold table with sample aggregation
    if "name" in columns:
        gold_sql = f"""
        CREATE OR REPLACE TABLE gold.{table_name}_record_count AS
        SELECT name, COUNT(*) AS total_records
        FROM {silver}
        GROUP BY name
        """
        spark.sql(gold_sql)

Loop through All Tables

In [0]:
for t in tables:
    run_etl_pipeline(t["name"], t["key"], t["path"])

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4013706027158700>:2[0m
[1;32m      1[0m [38;5;28;01mfor[39;00m t [38;5;129;01min[39;00m tables:
[0;32m----> 2[0m     [43mrun_etl_pipeline[49m[43m([49m[43mt[49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mname[39;49m[38;5;124;43m"[39;49m[43m][49m[43m,[49m[43m [49m[43mt[49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mkey[39;49m[38;5;124;43m"[39;49m[43m][49m[43m,[49m[43m [49m[43mt[49m[43m[[49m[38;5;124;43m"[39;49m[38;5;124;43mpath[39;49m[38;5;124;43m"[39;49m[43m][49m[43m)[49m

File [0;32m<command-4013706027158699>:29[0m, in [0;36mrun_etl_pipeline[0;34m(table_name, key, file_path)[0m
[1;32m     20[0m merge_sql [38;5;241m=[39m [38;5;124mf[39m[38;5;124m"""[39m
[1;32m     21[0m [38;5;124mMERGE INTO [39m[38;5;132;01m{[39;00msilve

Query the Gold Layer

In [0]:
%sql
SELECT * FROM gold.media_customer_reviews;

