### Bronze Layer structure generation

The default settings in adb lakeflow connect doesnt allow to create table alias and keep the origin schema structure using only 1 pipeline. 

Due to the low-consumption aproach for this PoC. We decided to keep only 1 ingesiton pipeline and use a consolidated schema named stg_all_schemas. Then a LDP job will recreate the structure and feed the corresponding stg schemas.}

This notebook governs the stg schemas table creation and schema structures


In [0]:
import pipelines as dp
from pyspark.sql.functions import *

# 1. Refined Configuration Mapping based on AdventureWorks OLTP Schemas
schema_groups = {
    "sales": [
        "countryregioncurrency", "creditcard", "currency", "currencyrate", 
        "customer", "personcreditcard", "salesorderdetail", "salesorderheader", 
        "salesorderheadersalesreason", "salesperson", "salespersonquotahistory", 
        "salesreason", "salestaxrate", "salesterritory", "salesterritoryhistory", 
        "shoppingcartitem", "specialoffer", "specialofferproduct", "store"
    ],
    "humanresources": [
        "department", "employee", "employeedepartmenthistory", 
        "employeepayhistory", "jobcandidate", "shift"
    ],
    "purchasing": [
        "productvendor", "purchaseorderdetail", "purchaseorderheader", 
        "shipmethod", "vendor"
    ],
    "person": [
        "address", "addresstype", "businessentity", "businessentityaddress", 
        "businessentitycontact", "contacttype", "countryregion", "emailaddress", 
        "password", "person", "personphone", "phonenumbertype", "stateprovince"
    ],
    "production": [
        "billofmaterials", "culture", "document", "illustration", "location", 
        "product", "productcategory", "productcosthistory", "productdescription", 
        "productdocument", "productinventory", "productlistpricehistory", 
        "productmodel", "productmodelillustration", 
        "productmodelproductdescriptionculture", "productphoto", 
        "productproductphoto", "productreview", "productsubcategory", 
        "scrapreason", "transactionhistory", "transactionhistoryarchive", 
        "unitmeasure", "workorder", "workorderrouting"
    ]
}

source_catalog_schema = "dev_bronze.stg_allschemas"

# mapper source -> target table generator
def create_stg_table(source_name, target_name):
    @dp.table(
        name=target_name,
        comment=f"Streaming staging table for {source_name}"
    )
    def table_definition():
        # Using the debugged streaming pattern
        return spark.readStream.table(source_name)

# 3. Nested Iterator to initialize all tables using the mapper generator
for schema_prefix, tables in schema_groups.items():
    # Dynamically sets the target schema (e.g., dev_bronze.stg_sales)
    target_catalog_schema = f"dev_bronze.stg_{schema_prefix}"
    
    for table_name in tables:
        full_source = f"{source_catalog_schema}.{table_name}"
        full_target = f"{target_catalog_schema}.stg_{table_name}"
        
        # Execute the generator to register the table
        create_stg_table(full_source, full_target)