# Bronze Ingestion

This notebook performs the automated ingestion from the Raw landing zone to the Bronze Delta tables.

In [0]:
from pyspark.sql.functions import current_timestamp, col

In [0]:
%run ../nb_setup

In [0]:
%run ../utilities/nb_schemas

In [0]:
dbutils.widgets.text("entity_filter", "all")
filter_value = dbutils.widgets.get("entity_filter").strip().lower()

if filter_value == "all":
    target_entities = "all"
else:
    target_entities = [e.strip() for e in filter_value.split(",")]


### 1. The Ingestion Map
Mapping folders to file formats and specific schemas.

In [0]:
ingestion_configs = [
    {"entity": "accounts", "format": "csv", "schema": accounts_schema},
    {"entity": "audit_logs", "format": "avro", "schema": audit_logs_schema},
    {"entity": "branches", "format": "csv", "schema": barnches_schema},
    {"entity": "credit_scores", "format": "parquet", "schema": credit_scores_schema},
    {"entity": "customers", "format": "json", "schema": customers_schema},
    {"entity": "exchange_rates", "format": "csv", "schema": exchange_rates_schema},
    {"entity": "transactions", "format": "csv", "schema": transactions_schema}
]

### 2. AutoLoader Function
A reusable function that handles the audit columns, rescued data, and archiving.

In [0]:

def ingest_raw_data(entity_name, source_format, source_schema):
    source_data_path = f"{paths['raw_data']}/{entity_name}"
    target_table = f"{paths['bronze_db']}.{entity_name}"
    checkpoint_path = f"{paths['checkpoints']}/{entity_name}"
    archive_path = f"{paths['archive_data']}/{entity_name}"
    
    (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", source_format)
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}_schema")
        .option("rescuedDataColumn", "_rescued_data")
        .option("cloudFiles.schemaEvolutionMode", "rescue")
        .schema(source_schema)
        .load(source_data_path)
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
        .writeStream
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .toTable(target_table)
    )


### 3. Execution

In [0]:
for config in ingestion_configs:
    entity = config["entity"]
    
    if target_entities == "all" or entity in target_entities:
        # 1. Ingest data to Bronze
        ingest_raw_data(
            entity_name=entity, 
            source_format=config["format"], 
            source_schema=config["schema"]
        )
        
        # 2. Archival
        source_dir = f"{paths['raw_data']}/{entity}"
        archive_dir = f"{paths['archive_data']}/{entity}"
        
        # Create archive directory if it doesn't exist
        dbutils.fs.mkdirs(archive_dir)
        
        # Move all files from source to archive
        try:
            files = dbutils.fs.ls(source_dir)
            if files:
                for f in files:
                    dbutils.fs.mv(f.path, f"{archive_dir}/{f.name}")
                print(f"Archived {len(files)} files for {entity}")
        except Exception as e:
            print(f"No files found to archive for {entity}")