# Step 01 - Bronze Load

Ingest raw JSON sources from `data/raw` into DuckDB bronze tables.

- Basic processing so that JSON or NDJSON can be loaded without data loss.
- Falls back to a simple cleaner if the input file is loosely formatted.

Gibbons, 2025-11-24


In [21]:
from pathlib import Path
import json
import duckdb

"""
bronze_load.py
---------
Ingest from raw JSON sources

- Basic processing to get into DuckDB without data loss

Gibbons 2025_11_24
"""

# ---------------------------
# Project paths
# ---------------------------

if "__file__" in globals():
    # Running as a .py script
    THIS_FILE = Path(__file__).resolve()
    SRC_DIR = THIS_FILE.parents[1]          # .../src
    PROJECT_ROOT = THIS_FILE.parents[2]     # .../sales_test
else:
    # Running inside a notebook or REPL.
    # Walk up from the current working directory until we find a folder
    # that contains src/pipelines. Treat that as the project root.
    cwd = Path.cwd()
    PROJECT_ROOT = cwd
    for candidate in [cwd] + list(cwd.parents):
        if (candidate / "src" / "pipelines").exists():
            PROJECT_ROOT = candidate
            break

    SRC_DIR = PROJECT_ROOT / "src"

DB_PATH = SRC_DIR / "sales.duckdb"
RAW_DIR = PROJECT_ROOT / "data" / "raw"


## JSON validator (optional, for debugging)

Quick sanity check that each non-empty line parses as JSON. Handy if a file fails to load.

In [22]:
def validate_json_lines(path: Path) -> None:
    """Quick sanity check: try to parse each non-empty line as JSON."""
    with path.open("r", encoding="utf-8") as f:
        for i, line in enumerate(f, start=1):
            text = line.strip()
            if not text:
                continue

            # Many line-delimited JSON files use one object per line.
            # If there are trailing commas, strip them before validating.
            if text.endswith(","):
                text = text[:-1]

            try:
                json.loads(text)
            except json.JSONDecodeError as e:
                print(f"[VALIDATION] Possible malformed JSON in {path} at line {i}: {e}")
                break


## Cleaner: loose JSON to NDJSON

Simple fixer that trims blank lines and trailing commas, then writes one JSON object per line.

In [23]:
def fix_to_ndjson(path: Path) -> Path:
    """Convert loosely formatted JSON list into NDJSON (one JSON object per line).
    Removes trailing commas and blank lines, writes a temporary fixed file.
    """
    fixed_path = path.with_name(path.stem + "_fixed.json")

    with path.open("r", encoding="utf-8") as src, fixed_path.open("w", encoding="utf-8") as dst:
        for line in src:
            cleaned = line.strip()

            if not cleaned:
                continue  # skip blank lines

            if cleaned.endswith(","):
                cleaned = cleaned[:-1]  # remove trailing comma

            dst.write(cleaned + "\n")

    return fixed_path


## Bronze loader

Load each JSON file from `data/raw` into its own DuckDB table in the bronze layer.

In [24]:
def load_bronze_table(
    con: duckdb.DuckDBPyConnection,
    filename: str,
    table_name: str,
) -> None:
    """Load a single JSON file from data/raw into a DuckDB table in the bronze layer.

    Strategy:
     1. Try to load as normal JSON with read_json_auto().
     2. If that fails, being malformed, clean it into NDJSON and retry with read_ndjson_auto().
    """
    file_path = RAW_DIR / filename

    if not file_path.exists():
        raise FileNotFoundError(f"Expected file not found: {file_path}")

    print(f"Loading {file_path} -> {table_name}")

    # First attempt: assume the file is valid JSON.
    try:
        con.execute(
            f"""
            CREATE OR REPLACE TABLE {table_name} AS
            SELECT *
            FROM read_json_auto(?)
            """,
            [str(file_path)],
        )
        return  # success, we are done

    except duckdb.InvalidInputException as e:
        print(f"[WARN] Initial JSON load failed for {file_path}")
        print(f"[WARN] DuckDB said: {e}")
        print("[INFO] Attempting to clean file into NDJSON and reload...")

        # validate_json_lines(file_path)

        fixed_path = fix_to_ndjson(file_path)

        try:
            con.execute(
                f"""
                CREATE OR REPLACE TABLE {table_name} AS
                SELECT *
                FROM read_ndjson_auto(?)
                """,
                [str(fixed_path)],
            )
            print(f"[INFO] Loaded cleaned NDJSON file {fixed_path} -> {table_name}")
        except duckdb.InvalidInputException as e2:
            print(f"[ERROR] Even cleaned NDJSON version failed for {file_path}")
            print(f"[ERROR] DuckDB reported: {e2}")
            # make pipeline fail loudly if this happens
            raise


## Entry point

Connect to DuckDB, load all bronze tables, then close the connection.

In [25]:
def main() -> None:
    print(f"Connecting to DuckDB at {DB_PATH}")
    print(f"Raw data folder: {RAW_DIR}")

    con = duckdb.connect(str(DB_PATH))

    tables_to_load = [
        ("customers.json", "bronze_customers"),
        ("products.json", "bronze_products"),
        ("orders.json", "bronze_orders"),
        ("sales.json", "bronze_sales"),
        ("countries.json", "bronze_countries"),
    ]

    for filename, table_name in tables_to_load:
        load_bronze_table(con, filename, table_name)

    con.close()
    print("Bronze layer finished.")


if __name__ == "__main__":
    main()


Connecting to DuckDB at c:\Projects\sales_test\src\sales.duckdb
Raw data folder: c:\Projects\sales_test\data\raw
Loading c:\Projects\sales_test\data\raw\customers.json -> bronze_customers
[WARN] Initial JSON load failed for c:\Projects\sales_test\data\raw\customers.json
[WARN] DuckDB said: Invalid Input Error: Malformed JSON in file "c:\Projects\sales_test\data\raw\customers.json", at byte 1 in record/value 3: unexpected character. 

LINE 4:             FROM read_json_auto(?)
                         ^
[INFO] Attempting to clean file into NDJSON and reload...
[INFO] Loaded cleaned NDJSON file c:\Projects\sales_test\data\raw\customers_fixed.json -> bronze_customers
Loading c:\Projects\sales_test\data\raw\products.json -> bronze_products
[WARN] Initial JSON load failed for c:\Projects\sales_test\data\raw\products.json
[WARN] DuckDB said: Invalid Input Error: Malformed JSON in file "c:\Projects\sales_test\data\raw\products.json", at byte 1 in record/value 3: unexpected character. 

LINE 4