# BERDL Lakehouse — Ingest: {TENANT}_{DATASET}

Two-phase ingest: **(1) upload source files to MinIO bronze**, then **(2) ingest into Delta silver**.
Tables larger than `CHUNK_TARGET_GB` are ingested in chunks to avoid Spark session timeouts.
A progress log is kept in MinIO so interrupted jobs can resume where they left off.

**Workflow:**
1. Set `DATA_DIR`, `TENANT`, `DATASET`, `MODE` in the **Configuration** cell.
2. Run all cells through **Pre-flight plan** and review the printed plan.
3. Set `CONFIRMED = True` in Configuration, then re-run from the Pre-flight cell onward.

### Configuration

In [None]:
from pathlib import Path

# ── USER CONFIGURATION ───────────────────────────────────────────────────────
DATA_DIR        = Path("{DATA_DIR}")   # directory containing source files
TENANT          = "{TENANT}"           # Lakehouse tenant name
DATASET         = "{DATASET}"          # dataset name (or None to use DATA_DIR.name)
BUCKET          = "cdm-lake"
MODE            = "{MODE}"             # "overwrite" or "append"
CHUNK_TARGET_GB = 20                   # tables above this size are ingested in chunks
CHUNKED_INGEST  = True                 # False = force single-batch ingest (risky for large tables)
CONFIRMED       = False                # set True after reviewing the pre-flight plan
# ─────────────────────────────────────────────────────────────────────────────

DATASET       = DATASET or DATA_DIR.name
NAMESPACE     = f"{TENANT}_{DATASET}"
BRONZE_PREFIX = f"tenant-general-warehouse/{TENANT}/datasets/{DATASET}"
CONFIG_KEY    = f"{BRONZE_PREFIX}/{DATASET}.json"
SILVER_BASE   = f"s3a://{BUCKET}/tenant-sql-warehouse/{TENANT}/{NAMESPACE}.db"
PROGRESS_KEY  = f"{BRONZE_PREFIX}/_ingest_progress.jsonl"

print(f"Tenant    : {TENANT}")
print(f"Dataset   : {DATASET}")
print(f"Namespace : {NAMESPACE}")
print(f"Mode      : {MODE}")
print(f"Chunked   : {CHUNKED_INGEST}  (threshold: {CHUNK_TARGET_GB} GB)")
print(f"Source    : {DATA_DIR.resolve()}")
print(f"Bronze    : s3a://{BUCKET}/{BRONZE_PREFIX}/")
print(f"Silver    : {SILVER_BASE}")

### Imports and `berdl_notebook_utils` stubs

Replaces JupyterHub-only submodules with lightweight stubs before importing
`data_lakehouse_ingest`, then wires in real implementations once clients are built.

In [None]:
import csv
import io
import json
import logging
import math
import re
import sqlite3
import sys
from datetime import datetime, timezone
from types import ModuleType

import pandas as pd

_STUB_MODULES = [
    "berdl_notebook_utils",
    "berdl_notebook_utils.berdl_settings",
    "berdl_notebook_utils.clients",
    "berdl_notebook_utils.setup_spark_session",
    "berdl_notebook_utils.spark",
    "berdl_notebook_utils.spark.database",
    "berdl_notebook_utils.spark.cluster",
    "berdl_notebook_utils.spark.dataframe",
    "berdl_notebook_utils.minio_governance",
]
for _name in _STUB_MODULES:
    sys.modules[_name] = ModuleType(_name)

def _create_namespace_if_not_exists(spark, namespace=None, append_target=True, tenant_name=None):
    ns = f"{tenant_name}_{namespace}" if tenant_name else namespace
    location = f"s3a://cdm-lake/tenant-sql-warehouse/{tenant_name}/{ns}.db"
    spark.sql(f"CREATE DATABASE IF NOT EXISTS `{ns}` LOCATION '{location}'")
    print(f"Namespace {ns} ready at {location}")
    return ns

sys.modules["berdl_notebook_utils.spark.database"].create_namespace_if_not_exists = (
    _create_namespace_if_not_exists
)
sys.modules["berdl_notebook_utils.setup_spark_session"].get_spark_session = None
sys.modules["berdl_notebook_utils.clients"].get_minio_client = None

from minio import Minio
from data_lakehouse_ingest import ingest
from get_spark_session import get_spark_session

logging.basicConfig(level=logging.INFO)
print("Imports OK.")

### Initialize Spark and MinIO clients

MinIO credentials are read from `~/.mc/config.json`. Spark connects via pproxy on port 8123.

In [None]:
import urllib3

_mc_cfg = json.loads(Path.home().joinpath(".mc/config.json").read_text())
_berdl  = _mc_cfg["aliases"]["berdl-minio"]

minio_client = Minio(
    endpoint=_berdl["url"].replace("https://", "").replace("http://", ""),
    access_key=_berdl["accessKey"],
    secret_key=_berdl["secretKey"],
    secure=_berdl["url"].startswith("https"),
    http_client=urllib3.ProxyManager("http://127.0.0.1:8123"),
)

spark = get_spark_session()

sys.modules["berdl_notebook_utils.setup_spark_session"].get_spark_session = lambda **kw: spark
sys.modules["berdl_notebook_utils.clients"].get_minio_client = lambda **kw: minio_client

print("Spark and MinIO clients ready.")

### Detect source format

In [None]:
if not DATA_DIR.exists():
    raise FileNotFoundError(f"DATA_DIR not found: {DATA_DIR}")

db_files  = sorted(DATA_DIR.glob("*.db")) + sorted(DATA_DIR.glob("*.sqlite")) + sorted(DATA_DIR.glob("*.sqlite3"))
sql_files = sorted(DATA_DIR.glob("*.sql"))
tsv_files = sorted(DATA_DIR.glob("*.tsv"))
csv_files = sorted(DATA_DIR.glob("*.csv"))

print(f"SQLite databases : {[f.name for f in db_files]}")
print(f"SQL schema files : {[f.name for f in sql_files]}")
print(f"TSV files        : {[f.name for f in tsv_files]}")
print(f"CSV files        : {[f.name for f in csv_files]}")

if db_files:
    SOURCE_MODE = "sqlite"
    SOURCE_DB   = db_files[0]
    print(f"\nMode: SQLite -> TSV  (source: {SOURCE_DB.name})")
elif tsv_files:
    SOURCE_MODE = "tsv"
    print(f"\nMode: TSV files ({len(tsv_files)} found)")
elif csv_files:
    SOURCE_MODE = "csv"
    print(f"\nMode: CSV files ({len(csv_files)} found)")
else:
    raise ValueError(f"No recognised source files found in {DATA_DIR}")

SQL_SCHEMA = sql_files[0] if sql_files else None
print(f"Schema file      : {SQL_SCHEMA.name if SQL_SCHEMA else 'none -- all columns default to STRING'}")

### Parse schema from SQL file

Extracts `CREATE TABLE` statements and maps SQL types to Spark SQL types.
Any table or column without a match defaults to `STRING`.

In [None]:
_TYPE_MAP = {
    "TEXT": "STRING",    "VARCHAR": "STRING",  "CHAR": "STRING",   "CLOB": "STRING",
    "INTEGER": "INT",    "INT": "INT",          "SMALLINT": "INT",  "TINYINT": "INT",
    "MEDIUMINT": "INT",  "BIGINT": "BIGINT",
    "REAL": "DOUBLE",    "FLOAT": "DOUBLE",     "DOUBLE": "DOUBLE",
    "NUMERIC": "DOUBLE", "DECIMAL": "DOUBLE",   "NUMBER": "DOUBLE",
    "BLOB": "BINARY",    "BOOLEAN": "BOOLEAN",  "BOOL": "BOOLEAN",
}

def parse_sql_schema(sql_path):
    """Return {table_name: spark_schema_ddl} parsed from CREATE TABLE statements."""
    text = sql_path.read_text(encoding="utf-8", errors="replace")
    schemas = {}
    pattern = re.compile(
        r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?[`"\[]?(\w+)[`"\]]?\s*\(([^;]+?)\)\s*;',
        re.IGNORECASE | re.DOTALL,
    )
    for m in pattern.finditer(text):
        table_name = m.group(1)
        cols = []
        for line in m.group(2).splitlines():
            line = re.sub(r'/\*.*?\*/', '', line).strip()
            line = re.sub(r'--.*$', '', line).strip()
            line = line.rstrip(",")
            if not line:
                continue
            if re.match(r'(PRIMARY\s+KEY|UNIQUE|INDEX|FOREIGN\s+KEY|CHECK|CONSTRAINT)\b', line, re.I):
                continue
            tokens = re.split(r'\s+', line, maxsplit=2)
            if len(tokens) < 2:
                continue
            col_name   = re.sub(r'[`"\[\]]', '', tokens[0])
            raw_type   = re.sub(r'\(.*', '', tokens[1]).upper()
            spark_type = _TYPE_MAP.get(raw_type, "STRING")
            cols.append(f"{col_name} {spark_type}")
        if cols:
            schemas[table_name] = ", ".join(cols)
    return schemas

if SQL_SCHEMA:
    SCHEMAS = parse_sql_schema(SQL_SCHEMA)
    print(f"Parsed {len(SCHEMAS)} tables from {SQL_SCHEMA.name}:")
    for name, schema in SCHEMAS.items():
        print(f"  {name}: {schema[:100]}{'...' if len(schema) > 100 else ''}")
else:
    SCHEMAS = {}
    print("No .sql schema file -- all columns will default to STRING")

### Prepare data files and calculate chunk plan

Counts lines in each source file to determine per-table chunk sizes.
Chunk size is calculated proportionally: `chunk_lines = data_lines * (CHUNK_TARGET_GB / file_size_GB)`.
Tables at or below `CHUNK_TARGET_GB` are ingested in a single shot.

In [None]:
def _count_lines(filepath):
    """Count newlines in a file efficiently (result includes the header line)."""
    n = 0
    with open(filepath, "rb") as f:
        for block in iter(lambda: f.read(1 << 20), b""):
            n += block.count(b"\n")
    return n

if SOURCE_MODE == "sqlite":
    WORK_DIR  = Path(f"/tmp/{DATASET}_tsv")
    FILE_EXT  = ".tsv"
    DELIMITER = "\t"
    WORK_DIR.mkdir(exist_ok=True)

    def _clean(v):
        if v is None: return ""
        if isinstance(v, str):
            return v.replace("\t", " ").replace("\n", " ").replace("\r", " ")
        return v

    conn = sqlite3.connect(SOURCE_DB)
    conn.text_factory = lambda b: b.decode("utf-8", errors="replace")
    cur  = conn.cursor()
    cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
    TABLES = [r[0] for r in cur.fetchall()]
    for table in TABLES:
        out = WORK_DIR / f"{table}.tsv"
        cur.execute(f'SELECT * FROM "{table}"')
        cols = [d[0] for d in cur.description]
        if table not in SCHEMAS:
            SCHEMAS[table] = ", ".join(f"{c} STRING" for c in cols)
        with open(out, "w", newline="", encoding="utf-8") as fh:
            w = csv.writer(fh, delimiter="\t", quoting=csv.QUOTE_MINIMAL)
            w.writerow(cols)
            for row in cur:
                w.writerow([_clean(v) for v in row])
        rows = cur.execute(f'SELECT count(*) FROM "{table}"').fetchone()[0]
        print(f"  {table:30s}: {rows:>9,} rows  {out.stat().st_size / 1e6:.1f} MB")
    conn.close()
    source_files = sorted(WORK_DIR.glob("*.tsv"))

elif SOURCE_MODE in ("tsv", "csv"):
    source_files = tsv_files if SOURCE_MODE == "tsv" else csv_files
    WORK_DIR  = DATA_DIR
    FILE_EXT  = ".tsv" if SOURCE_MODE == "tsv" else ".csv"
    DELIMITER = "\t"  if SOURCE_MODE == "tsv" else ","
    TABLES    = [f.stem for f in source_files]
    for f in source_files:
        if f.stem not in SCHEMAS:
            with open(f, newline="") as fh:
                cols = next(csv.reader(fh, delimiter=DELIMITER))
            SCHEMAS[f.stem] = ", ".join(f"{c} STRING" for c in cols)

# ── Calculate chunk plan ─────────────────────────────────────────────────────
CHUNK_TARGET_BYTES = CHUNK_TARGET_GB * 1e9
TABLE_STATS = {}

print(f"Analyzing {len(source_files)} table(s) -- counting lines...")
for f in source_files:
    table      = f.stem
    size_bytes = f.stat().st_size
    print(f"  {f.name}: {size_bytes / 1e9:.1f} GB", end=" ", flush=True)
    total_lines = _count_lines(f)
    data_lines  = max(total_lines - 1, 0)          # exclude header row

    if CHUNKED_INGEST and size_bytes > CHUNK_TARGET_BYTES and data_lines > 0:
        chunk_size = max(1, round(data_lines * CHUNK_TARGET_BYTES / size_bytes))
        n_chunks   = math.ceil(data_lines / chunk_size)
    else:
        chunk_size = data_lines
        n_chunks   = 1

    TABLE_STATS[table] = {
        "path":       f,
        "size_bytes": size_bytes,
        "data_lines": data_lines,
        "chunk_size": chunk_size,
        "n_chunks":   n_chunks,
        "chunked":    n_chunks > 1,
    }
    note = (f"{n_chunks} chunks x ~{chunk_size:,} lines"
            if n_chunks > 1 else "single ingest")
    print(f"-> {data_lines:,} data lines  [{note}]")

### Pre-flight plan

Review the upload and ingest plan below.
When satisfied, set `CONFIRMED = True` in the Configuration cell and re-run from here.

In [None]:
W = 72
print("=" * W)
print("PRE-FLIGHT PLAN")
print("=" * W)

total_gb = sum(s["size_bytes"] for s in TABLE_STATS.values()) / 1e9
print("\nSTEP 1 -- MinIO Upload  (all tables uploaded in full before any ingest begins)")
for table, s in TABLE_STATS.items():
    print(f"  {table:<45s}  {s['size_bytes'] / 1e9:>7.1f} GB")
print(f"  {'TOTAL':<45s}  {total_gb:>7.1f} GB")

print(f"\nSTEP 2 -- Spark Ingest into Delta  (namespace: {NAMESPACE})")
for table, s in TABLE_STATS.items():
    if s["chunked"]:
        chunk_gb = s["size_bytes"] / s["n_chunks"] / 1e9
        print(f"  {table:<45s}  {s['n_chunks']} chunks x ~{s['chunk_size']:,} lines"
              f"  (~{chunk_gb:.1f} GB each)  [CHUNKED]")
    else:
        print(f"  {table:<45s}  {s['data_lines']:,} lines  [single ingest]")

print(f"\nProgress log : s3a://{BUCKET}/{PROGRESS_KEY}")
print(f"Ingest mode  : {MODE}")
print("=" * W)

if not CONFIRMED:
    raise RuntimeError(
        "\nReview the plan above.\n"
        "If it looks correct, set CONFIRMED = True in the Configuration cell "
        "and re-run from this cell onward."
    )
print("CONFIRMED -- proceeding.")

### Step 1 -- Upload all files to MinIO bronze

In [None]:
def _minio_object_size(key):
    """Return the size in bytes of a MinIO object, or -1 if it does not exist."""
    try:
        return minio_client.stat_object(BUCKET, key).size
    except Exception as e:
        if any(tag in str(e) for tag in ("NoSuchKey", "does not exist", "404")):
            return -1
        raise

# Upload master dataset config (always overwrite — it's tiny)
config = {
    "tenant": TENANT, "dataset": DATASET, "is_tenant": True,
    "paths": {
        "data_plane":  f"s3a://{BUCKET}/tenant-general-warehouse/{TENANT}/",
        "bronze_base": f"s3a://{BUCKET}/{BRONZE_PREFIX}/",
        "silver_base": SILVER_BASE,
    },
    "defaults": {"csv": {"header": True, "delimiter": DELIMITER, "inferSchema": False}},
    "tables": [
        {
            "name": table, "enabled": True,
            "schema_sql": SCHEMAS.get(table, ""),
            "partition_by": None, "mode": MODE,
            "bronze_path": f"s3a://{BUCKET}/{BRONZE_PREFIX}/{table}{FILE_EXT}",
        }
        for table in TABLES
    ],
}
config_bytes = json.dumps(config, indent=2).encode("utf-8")
minio_client.put_object(BUCKET, CONFIG_KEY, io.BytesIO(config_bytes), len(config_bytes),
                        content_type="application/json")
print(f"Config -> s3a://{BUCKET}/{CONFIG_KEY}")

# Upload data files — skip any file whose size already matches in MinIO
print("\nChecking / uploading data files...")
for table, s in TABLE_STATS.items():
    key          = f"{BRONZE_PREFIX}/{table}{FILE_EXT}"
    local_size   = s["size_bytes"]
    remote_size  = _minio_object_size(key)

    if remote_size == local_size:
        print(f"  {table}: {local_size / 1e9:.1f} GB -- already in MinIO, skipping upload")
        continue

    if remote_size != -1:
        print(f"  {table}: size mismatch (local {local_size:,} B vs MinIO {remote_size:,} B) -- re-uploading")
    else:
        print(f"  {table}: {local_size / 1e9:.1f} GB -- uploading...", end=" ", flush=True)

    minio_client.fput_object(BUCKET, key, str(s["path"]))
    print(f"done  -> s3a://{BUCKET}/{key}")

print("\nUpload step complete.")

### Step 2 -- Initialize progress log

Loads any existing progress from MinIO so an interrupted ingest can resume.

In [None]:
def _load_progress_log():
    """Load existing JSONL progress log from MinIO. Returns list of entries."""
    try:
        resp = minio_client.get_object(BUCKET, PROGRESS_KEY)
        entries = []
        for line in resp.read().decode().splitlines():
            line = line.strip()
            if line:
                entries.append(json.loads(line))
        return entries
    except Exception as e:
        if any(tag in str(e) for tag in ("NoSuchKey", "does not exist", "404")):
            return []
        raise

def _append_progress(entry):
    """Append one entry to the MinIO progress log (read-modify-write for durability)."""
    entries = _load_progress_log()
    entries.append(entry)
    data = ("\n".join(json.dumps(e) for e in entries) + "\n").encode()
    minio_client.put_object(BUCKET, PROGRESS_KEY, io.BytesIO(data), len(data),
                            content_type="application/x-ndjson")

PROGRESS_LOG = _load_progress_log()

complete_tables = {e["table"] for e in PROGRESS_LOG if e.get("status") == "complete"}
partial_chunks  = {}
for e in PROGRESS_LOG:
    if e.get("status") == "ingested" and e["table"] not in complete_tables:
        partial_chunks.setdefault(e["table"], []).append(e["chunk"])

if not PROGRESS_LOG:
    print("No prior progress -- will ingest all tables from scratch.")
else:
    print(f"Loaded {len(PROGRESS_LOG)} progress log entries.")
    if complete_tables:
        print(f"  Already complete  : {', '.join(sorted(complete_tables))}")
    if partial_chunks:
        print("  Partially ingested (will resume):")
        for t, chunks in sorted(partial_chunks.items()):
            done_rows = sum(e["rows_written"] for e in PROGRESS_LOG
                           if e["table"] == t and e.get("status") == "ingested")
            print(f"    {t}: {len(chunks)} chunk(s) done, {done_rows:,} rows ingested so far")

### Step 2 -- Ingest tables into Delta namespace

- Tables **<= CHUNK_TARGET_GB**: ingested in one shot via the `ingest()` pipeline.
- Tables **> CHUNK_TARGET_GB**: local file streamed in chunks via `pandas.read_csv(chunksize=N)`;
  each chunk is written to Delta with `append` mode via `spark.createDataFrame()`.
- Progress logged to MinIO after every chunk. Restart the cell to resume an interrupted ingest.

In [None]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

_PY_TYPE = {
    "STRING":  T.StringType(),  "INT":     T.IntegerType(),
    "BIGINT":  T.LongType(),    "DOUBLE":  T.DoubleType(),
    "BOOLEAN": T.BooleanType(), "BINARY":  T.BinaryType(),
}

def _spark_schema(schema_ddl):
    """Parse 'col TYPE, col TYPE, ...' into a PySpark StructType."""
    fields = []
    for part in schema_ddl.split(","):
        tokens = part.strip().split()
        if len(tokens) >= 2:
            fields.append(T.StructField(tokens[0], _PY_TYPE.get(tokens[1], T.StringType()), True))
    return T.StructType(fields)


def _ingest_chunked(table, stats, schema_ddl):
    """Stream a large local file into Delta in line-count chunks."""
    delta_path   = f"{SILVER_BASE}/{table}"
    target_schema = _spark_schema(schema_ddl) if schema_ddl else None
    done_chunks  = {e["chunk"] for e in PROGRESS_LOG
                    if e["table"] == table and e.get("status") == "ingested"}
    rows_done    = sum(e["rows_written"] for e in PROGRESS_LOG
                       if e["table"] == table and e.get("status") == "ingested")
    table_registered = bool(done_chunks)

    reader = pd.read_csv(
        stats["path"], sep=DELIMITER, chunksize=stats["chunk_size"],
        dtype=str, keep_default_na=False, na_values=[],
    )
    for chunk_num, chunk_df in enumerate(reader):
        if chunk_num in done_chunks:
            print(f"  Chunk {chunk_num + 1}/{stats['n_chunks']}: already ingested -- skipping")
            continue

        start_line = chunk_num * stats["chunk_size"] + 1
        end_line   = start_line + len(chunk_df) - 1

        # Build Spark DataFrame: read all as string, then cast to target types
        if target_schema:
            str_schema = T.StructType([
                T.StructField(f.name, T.StringType(), True) for f in target_schema.fields
            ])
            sdf = spark.createDataFrame(chunk_df, schema=str_schema)
            for field in target_schema.fields:
                if not isinstance(field.dataType, T.StringType):
                    sdf = sdf.withColumn(field.name, F.col(field.name).cast(field.dataType))
        else:
            sdf = spark.createDataFrame(chunk_df)

        # First chunk of a fresh ingest uses MODE; all subsequent chunks always append
        write_mode = MODE if (chunk_num == 0 and not done_chunks) else "append"
        sdf.write.format("delta").mode(write_mode).save(delta_path)

        # Register table in catalog on first successful write
        if not table_registered:
            spark.sql(f"""
                CREATE TABLE IF NOT EXISTS `{NAMESPACE}`.`{table}`
                USING DELTA LOCATION '{delta_path}'
            """)
            table_registered = True

        rows_done += len(chunk_df)
        _append_progress({
            "table": table, "chunk": chunk_num,
            "start_line": start_line, "end_line": end_line,
            "rows_written": len(chunk_df), "rows_cumulative": rows_done,
            "status": "ingested",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        })
        print(f"  Chunk {chunk_num + 1}/{stats['n_chunks']}: "
              f"lines {start_line:,}-{end_line:,}  "
              f"({len(chunk_df):,} rows written, {rows_done:,} cumulative)")

    return rows_done


# ── Ensure namespace exists ───────────────────────────────────────────────────
spark.sql(f"CREATE DATABASE IF NOT EXISTS `{NAMESPACE}` LOCATION '{SILVER_BASE}'")
print(f"Namespace {NAMESPACE} ready.\n")

# ── Ingest loop ───────────────────────────────────────────────────────────────
for table, stats in TABLE_STATS.items():
    if any(e["table"] == table and e.get("status") == "complete" for e in PROGRESS_LOG):
        print(f"{table}: already complete -- skipping\n")
        continue

    print(f"{'=' * 60}")
    print(f"Ingesting: {table}")
    print(f"  {stats['data_lines']:,} rows | {stats['n_chunks']} chunk(s) | {stats['size_bytes'] / 1e9:.1f} GB")

    if not stats["chunked"]:
        # Small table: single ingest via pipeline
        tbl_cfg = {
            "tenant": TENANT, "dataset": DATASET, "is_tenant": True,
            "paths": {
                "data_plane":  f"s3a://{BUCKET}/tenant-general-warehouse/{TENANT}/",
                "bronze_base": f"s3a://{BUCKET}/{BRONZE_PREFIX}/",
                "silver_base": SILVER_BASE,
            },
            "defaults": {"csv": {"header": True, "delimiter": DELIMITER, "inferSchema": False}},
            "tables": [{
                "name": table, "enabled": True,
                "schema_sql": SCHEMAS.get(table, ""),
                "partition_by": None, "mode": MODE,
                "bronze_path": f"s3a://{BUCKET}/{BRONZE_PREFIX}/{table}{FILE_EXT}",
            }],
        }
        tbl_cfg_key   = f"{BRONZE_PREFIX}/{table}_config.json"
        tbl_cfg_bytes = json.dumps(tbl_cfg, indent=2).encode()
        minio_client.put_object(BUCKET, tbl_cfg_key, io.BytesIO(tbl_cfg_bytes),
                                len(tbl_cfg_bytes), content_type="application/json")
        ingest(f"s3a://{BUCKET}/{tbl_cfg_key}", spark=spark, minio_client=minio_client)
        rows_done = stats["data_lines"]
        _append_progress({
            "table": table, "chunk": 0,
            "start_line": 1, "end_line": rows_done,
            "rows_written": rows_done, "rows_cumulative": rows_done,
            "status": "ingested",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        })
    else:
        # Large table: chunked streaming ingest
        rows_done = _ingest_chunked(table, stats, SCHEMAS.get(table, ""))

    _append_progress({
        "table": table, "status": "complete",
        "total_rows": rows_done, "total_chunks": stats["n_chunks"],
        "timestamp": datetime.now(timezone.utc).isoformat(),
    })
    print(f"  {table}: {rows_done:,} rows -- COMPLETE\n")

print("All tables ingested.")

### Verify row counts

In [None]:
print("=" * 60)
print("VERIFICATION")
print("=" * 60)
spark.sql(f"SHOW TABLES IN `{NAMESPACE}`").show()

PROGRESS_LOG = _load_progress_log()
all_match = True
print("Row counts (Delta vs expected):")
for table, stats in TABLE_STATS.items():
    count    = spark.sql(f"SELECT COUNT(*) FROM `{NAMESPACE}`.`{table}`").collect()[0][0]
    expected = stats["data_lines"]
    match    = "OK" if count == expected else "MISMATCH"
    if count != expected:
        all_match = False
    print(f"  {table:<45s}  {count:>12,}  expected {expected:>12,}  [{match}]")

print("\nProgress log summary:")
for e in PROGRESS_LOG:
    if e.get("status") == "complete":
        print(f"  {e['table']}: {e['total_rows']:,} rows, {e['total_chunks']} chunk(s) -- COMPLETE")

print()
if all_match:
    print("All row counts match. Ingest successful.")
    print(f"Namespace : {NAMESPACE}")
    print(f"Bronze    : s3a://{BUCKET}/{BRONZE_PREFIX}/")
    print(f"Silver    : {SILVER_BASE}")
else:
    print("Row count mismatch detected.")
    print("Check the progress log for the last completed line range per chunk:")
    print(f"  s3a://{BUCKET}/{PROGRESS_KEY}")
    print("Or check the quarantine path for rejected rows:")
    print(f"  {SILVER_BASE}/quarantine/")

In [None]:
spark.stop()