# 10 — Bronze Load Worker

This notebook defines the worker function that loads a **single table** from parquet into a Bronze Delta table.

## Architecture: Bronze History Pattern

Bronze uses **APPEND with run_ts partitioning** to enable full CDC capability:
- **Snapshot/Window tables**: Overwrite entire table
- **Incremental tables**: Append with `_bronze_load_ts` partition
- Silver can reconstruct current state and detect deletes

## Key Features
- Does **not** write to log table (returns metrics dict)
- Reads parquet from auto-detected path (Fabric or Local)
- Adds metadata columns: `_bronze_load_ts`, `_bronze_filename`
- Handles missing files, empty data, corrupt Delta tables
- Returns comprehensive metrics for logging

## Load Modes
- **snapshot**: Complete table refresh (overwrite)
- **incremental**: Delta append with run_ts (CDC history)
- **window**: Overwrite with partitioning support

This notebook is imported via `%run` from the master orchestrator.

In [None]:
# Parameters (set by orchestrator, not by Papermill in this case)
# These are here for documentation and can be overridden when %run is called
RUN_ID = None  # Will be set by orchestrator
#DEBUG = False  # Enable debug output

## [1] Imports and Path Detection

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    lit, current_timestamp, input_file_name, col, year, month
)
from datetime import datetime
from typing import Dict, Any
from uuid import uuid4
import os
from modules.path_utils import build_parquet_dir
from pyspark.sql import functions as F

print("✓ Imports loaded")

In [None]:
# Auto-detect base path (Fabric vs Cluster)
# Base path voor Files/Tables gebruiken we centraal vanuit 02_utils_config

try:
    # Als 02_utils_config al gedraaid heeft, staat BASE_PATH al in de globals
    BASE_PATH  # type: ignore[name-defined]
    print(f"✓ Base path (bronze worker): {BASE_PATH}")
    env_type = (
        "Fabric" if "/lakehouse/default/Files" in BASE_PATH
        else "Custom Cluster" if "/data/lakehouse/" in BASE_PATH
        else "Cluster/Relative"
    )
    print(f"✓ Environment (bronze worker): {env_type}")
except NameError:
    # Fallback zodat je 10_bronze_load ook stand-alone kunt draaien
    import glob
    import os

    def detect_base_path() -> str:
        """
        Zelfde logica als in 02_utils_config:
        1. Fabric: /lakehouse/default/Files
        2. Custom cluster: /data/lakehouse/**/Files
        3. Anders: 'Files' (relatief, bijvoorbeeld in je repo)
        """
        # 1. Fabric
        fabric_path = "/lakehouse/default/Files"
        if os.path.exists(fabric_path):
            return fabric_path

        # 2. Custom cluster
        if os.path.exists("/data/lakehouse"):
            pattern = "/data/lakehouse/**/Files"
            matches = glob.glob(pattern, recursive=True)
            if matches:
                return sorted(matches)[0]

        # 3. Local / relative
        return "Files"

    BASE_PATH = detect_base_path()
    env_type = (
        "Fabric" if "/lakehouse/default/Files" in BASE_PATH
        else "Custom Cluster" if "/data/lakehouse/" in BASE_PATH
        else "Local/Relative"
    )
    print(f"✓ Base path (bronze worker – fallback): {BASE_PATH}")
    print(f"✓ Environment (bronze worker – fallback): {env_type}")


## [2] Helper Functions

In [None]:
def is_missing_path_error(exc: Exception) -> bool:
    """
    Heuristic to detect 'no parquet files found' situations.
    """
    msg = str(exc).lower()
    return (
        "path does not exist" in msg
        or "no such file or directory" in msg
        or "file not found" in msg
        or "cannot find path" in msg
    )


def is_probably_corrupt_delta(exc: Exception) -> bool:
    """
    Heuristic to detect a broken Delta table that may need to be recreated.
    """
    msg = str(exc).lower()
    return (
        "is not a delta table" in msg
        or "failed to merge fields" in msg
        or "incompatible format" in msg
        or "protocol" in msg and "unsupported" in msg
        or ("delta log" in msg and "error" in msg)
    )


def get_last_num_output_rows(table_fullname: str) -> int | None:
    """
    Get the number of written rows from the last Delta write
    for this table using DESCRIBE HISTORY.
    """
    history = spark.sql(f"DESCRIBE HISTORY {table_fullname}")
    row = (
        history
        .orderBy(F.col("version").desc())
        .select(F.col("operationMetrics")["numOutputRows"].alias("rows"))
        .first()
    )
    return int(row["rows"]) if row and row["rows"] is not None else None




print("✓ Helper functions defined")

## [3] Core Worker Function

This is the main function that processes a single table.

In [None]:
def process_bronze_table(
    table_def: Dict[str, Any],
    source_name: str,
    run_ts: str,
    base_files: str, #= "greenhouse_sources",
    debug: bool = False
) -> Dict[str, Any]:
    """
    Load a single table's parquet files for a given run_ts into Bronze Delta table.
    
    Architecture:
    - Snapshot/Window: Overwrite entire table
    - Incremental: Append with _bronze_load_ts partition (for CDC)
    
    Args:
        table_def: Table definition from DAG
        source_name: Source system name (e.g., "vizier")
        run_ts: Run timestamp (e.g., "20251105T142752505")
        base_files: Base directory for files (default: "greenhouse_sources")
        debug: Enable debug output
    
    Returns:
        Dict with processing results:
        - log_id, run_id, run_ts, source, table_name, load_mode
        - status (SUCCESS, FAILED, SKIPPED, EMPTY)
        - rows_processed
        - start_time, end_time, duration_seconds
        - error_message, parquet_path, delta_table
    """
    
    # Validate table_def
    table_name = table_def.get("name")
    if not table_name:
        raise ValueError("table_def is missing 'name'")
    
    # Validate RUN_ID is set
    if RUN_ID is None:
        raise ValueError("RUN_ID must be set before calling process_bronze_table")
    
    # Get load mode (default to snapshot)
    load_mode = (table_def.get("load_mode") or "snapshot").lower()
    supported_modes = {"snapshot", "window", "incremental"}
    
    # Initialize metrics
    log_id = f"{source_name}:{table_name}:{run_ts}:{uuid4().hex[:8]}"
    start_time = datetime.utcnow()
    end_time = None
    status = "RUNNING"
    error_message = None
    rows_processed = None
    
    # Early exit for unsupported load modes
    if load_mode not in supported_modes:
        end_time = datetime.utcnow()
        duration = int((end_time - start_time).total_seconds())
        
        if debug:
            print(f"[{table_name}] SKIPPED: unsupported load_mode '{load_mode}'")
        
        return {
            "log_id": log_id,
            "run_id": RUN_ID,
            "run_ts": run_ts,
            "source": source_name,
            "table_name": table_name,
            "load_mode": load_mode,
            "status": "SKIPPED",
            "rows_processed": None,
            "start_time": start_time,
            "end_time": end_time,
            "duration_seconds": duration,
            "error_message": f"Unsupported load_mode '{load_mode}'",
            "parquet_path": None,
            "delta_table": None,
        }
    
    # Build target table name
    target_table = table_def.get("delta_table") or table_name
    delta_schema = table_def.get("delta_schema") or "bronze"
    
    # Handle schema.table format
    if "." not in target_table:
        delta_table_full = f"{delta_schema}.{target_table}"
    else:
        delta_table_full = target_table
    
    # Build parquet path
    parquet_dir = build_parquet_dir(base_files, source_name, run_ts, table_name, spark)
    parquet_glob = f"{parquet_dir}/*.parquet"
    
    if debug:
        print(f"[{table_name}] Starting ({load_mode})")
        print(f"  Parquet: {parquet_dir}")
        print(f"  Target: {delta_table_full}")
    
    # ========================================================================
    # STEP 1: Read Parquet
    # ========================================================================
    
    try:
        df = spark.read.parquet(parquet_glob)
                
    except Exception as e:
        if is_missing_path_error(e):
            # No parquet files - table not exported in this run
            end_time = datetime.utcnow()
            duration = int((end_time - start_time).total_seconds())
            
            if debug:
                print(f"[{table_name}] SKIPPED: No parquet files in {parquet_dir}")
            
            return {
                "log_id": log_id,
                "run_id": RUN_ID,
                "run_ts": run_ts,
                "source": source_name,
                "table_name": table_name,
                "load_mode": load_mode,
                "status": "SKIPPED",
                "rows_processed": 0,
                "start_time": start_time,
                "end_time": end_time,
                "duration_seconds": duration,
                "error_message": f"No parquet files found in {parquet_dir}",
                "parquet_path": parquet_dir,
                "delta_table": delta_table_full,
            }
        else:
            # Other read error
            end_time = datetime.utcnow()
            duration = int((end_time - start_time).total_seconds())
            
            if debug:
                print(f"[{table_name}] FAILED reading parquet: {str(e)[:200]}")
            
            return {
                "log_id": log_id,
                "run_id": RUN_ID,
                "run_ts": run_ts,
                "source": source_name,
                "table_name": table_name,
                "load_mode": load_mode,
                "status": "FAILED",
                "rows_processed": None,
                "start_time": start_time,
                "end_time": end_time,
                "duration_seconds": duration,
                "error_message": f"Read parquet failed: {str(e)[:500]}",
                "parquet_path": parquet_dir,
                "delta_table": delta_table_full,
            }
    
    # ========================================================================
    # STEP 2: Add Metadata Columns
    # ========================================================================
    
    # Add Bronze metadata columns
    df_with_meta = df \
        .withColumn("_bronze_load_ts", lit(run_ts)) \
        .withColumn("_bronze_filename", input_file_name())
    
    # For window tables with partitioning config, add partition columns
    partitioning_config = table_def.get("partitioning")
    if partitioning_config and load_mode in ("window"):
        partition_type = partitioning_config.get("type")
        
        if partition_type == "year_month":
            year_col = partitioning_config.get("year_col", "p_year")
            month_col = partitioning_config.get("month_col", "p_month")
            
            # Get window column to extract year/month from
            window_config = table_def.get("window", {})
            window_col = window_config.get("column", "Boek_Datum")  # Default
            
            if window_col in df.columns:
                df_with_meta = df_with_meta \
                    .withColumn(year_col, year(col(window_col))) \
                    .withColumn(month_col, month(col(window_col)))
                
                if debug:
                    print(f"  Added partitioning: {year_col}, {month_col} from {window_col}")
    
    # ========================================================================
    # STEP 3: Write to Delta
    # ========================================================================
    
    try:
        
        # Determine write mode based on load_mode
        if load_mode == "incremental":
            # APPEND with partition by _bronze_load_ts (CDC history)
            writer = df_with_meta.write \
                .format("delta") \
                .mode("append") \
                .partitionBy("_bronze_load_ts")
            
            if debug:
                print(f"  Mode: APPEND with partition by _bronze_load_ts (CDC history)")
        
        elif load_mode in ("snapshot", "window"):
            # OVERWRITE entire table
            # For window tables with partitioning, could use dynamic partition overwrite
            # but for simplicity, we overwrite entire table
            
            if partitioning_config:
                # Partitioned overwrite
                partition_type = partitioning_config.get("type")
                if partition_type == "year_month":
                    year_col = partitioning_config.get("year_col", "p_year")
                    month_col = partitioning_config.get("month_col", "p_month")
                    
                    writer = df_with_meta.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .option("overwriteSchema", "true") \
                        .partitionBy(year_col, month_col)
                    
                    if debug:
                        print(f"  Mode: OVERWRITE with partitioning by {year_col}, {month_col}")
                else:
                    # Unknown partition type, just overwrite
                    writer = df_with_meta.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .option("overwriteSchema", "true")
            else:
                # No partitioning, simple overwrite
                writer = df_with_meta.write \
                    .format("delta") \
                    .mode("overwrite") \
                    .option("overwriteSchema", "true")
                
                if debug:
                    print(f"  Mode: OVERWRITE")
        
        # Execute write
        writer.saveAsTable(delta_table_full)
        rows_processed = get_last_num_output_rows(delta_table_full)
        
        # Check for empty result
        if rows_processed == 0:
            end_time = datetime.utcnow()
            duration = int((end_time - start_time).total_seconds())
            
            if debug:
                print(f"[{table_name}] EMPTY: Parquet exists but contains 0 rows")
            
            return {
                "log_id": log_id,
                "run_id": RUN_ID,
                "run_ts": run_ts,
                "source": source_name,
                "table_name": table_name,
                "load_mode": load_mode,
                "status": "EMPTY",
                "rows_processed": 0,
                "start_time": start_time,
                "end_time": end_time,
                "duration_seconds": duration,
                "error_message": "Parquet exists but contains 0 rows",
                "parquet_path": parquet_dir,
                "delta_table": delta_table_full,
            }
        
        # Success!
        status = "SUCCESS"
        
    except Exception as e:
        # Try recovery if Delta table looks corrupt
        if is_probably_corrupt_delta(e):
            if debug:
                print(f"[{table_name}] Write failed, attempting DROP+RECREATE: {str(e)[:200]}")
            
            try:
                # Drop and recreate
                spark.sql(f"DROP TABLE IF EXISTS {delta_table_full}")
                
                # Recreate with appropriate partitioning
                if load_mode == "incremental":
                    writer = df_with_meta.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .option("overwriteSchema", "true") \
                        .partitionBy("_bronze_load_ts")
                elif partitioning_config and partitioning_config.get("type") == "year_month":
                    year_col = partitioning_config.get("year_col", "p_year")
                    month_col = partitioning_config.get("month_col", "p_month")
                    writer = df_with_meta.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .option("overwriteSchema", "true") \
                        .partitionBy(year_col, month_col)
                else:
                    writer = df_with_meta.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .option("overwriteSchema", "true")
                
                writer.saveAsTable(delta_table_full)
                rows_processed = get_last_num_output_rows(delta_table_full)

                # Count rows
                if load_mode == "incremental":
                    rows_processed = spark.table(delta_table_full) \
                        .where(f"_bronze_load_ts = '{run_ts}'") \
                        .count()
                else:
                    rows_processed = spark.table(delta_table_full).count()
                
                status = "SUCCESS"
                error_message = f"Initial write failed but table was recreated. Original error: {str(e)[:300]}"
                
                if debug:
                    print(f"[{table_name}] Recovery successful")
                
            except Exception as e2:
                status = "FAILED"
                error_message = f"Write failed and recovery failed: {str(e2)[:500]}"
                
                if debug:
                    print(f"[{table_name}] Recovery FAILED: {str(e2)[:200]}")
        else:
            status = "FAILED"
            error_message = f"Write failed: {str(e)[:500]}"
            
            if debug:
                print(f"[{table_name}] FAILED: {str(e)[:200]}")
    
    # ========================================================================
    # STEP 4: Return Results
    # ========================================================================
    
    end_time = datetime.utcnow()
    duration = int((end_time - start_time).total_seconds())
    
    if debug:
        print(f"[{table_name}] {status} in {duration}s ({rows_processed:,} rows)")
    
    return {
        "log_id": log_id,
        "run_id": RUN_ID,
        "run_ts": run_ts,
        "source": source_name,
        "table_name": table_name,
        "load_mode": load_mode,
        "status": status,
        "rows_processed": rows_processed,
        "start_time": start_time,
        "end_time": end_time,
        "duration_seconds": duration,
        "error_message": error_message,
        "parquet_path": parquet_dir,
        "delta_table": delta_table_full,
    }


print("✓ Bronze worker function defined")

## [4] Function Ready

The `process_bronze_table()` function is now available for use by the orchestrator.

**Usage pattern:**

```python
# In orchestrator notebook:
%run "10_bronze_load"

# Set RUN_ID
RUN_ID = f"run_{run_ts}"

# Process tables
results = []
for table in tables:
    result = process_bronze_table(
        table_def=table,
        source_name=source,
        run_ts=run_ts,
        base_files=base_files,
        debug=True
    )
    results.append(result)

# Log results in batch
log_bronze_batch(results)
```

In [None]:
print("\n" + "=" * 80)
print("BRONZE WORKER READY")
print("=" * 80)
print(f"Base path: {BASE_PATH}")
print(f"Environment: {'Fabric' if '/lakehouse' in BASE_PATH else 'Local'}")
print("\nFunction available: process_bronze_table(table_def, source_name, run_ts, ...)")
print("\n⚠️  Remember to set RUN_ID before calling process_bronze_table()")
print("✓ Bronze worker notebook loaded successfully")