In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
fingertips_bronze_all_in_one.py

Local downloader + S3 uploader for Fingertips "bronze" layer.

Key upgrades:
- Treat header-only at every stage as EMPTY (not ERROR) with clear reasons.
- Fallback A: concat per-group CSVs (by_group_id) into one file.
- Fallback B: concat per-indicator CSVs (by_indicator_id, batched) using Indicator Metadata snapshot.
- Completed CSV-only meta backfill.
- Robust metadata CSV parsing + mappings (profile→groups→indicators).
"""

import os, re, gzip, json, hashlib, datetime as dt, time, random, csv
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Iterable
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
from requests.adapters import HTTPAdapter, Retry
from dateutil.tz import tzlocal
from tqdm.auto import tqdm
from urllib3.exceptions import ProtocolError as Urllib3ProtocolError
from requests.exceptions import ConnectionError, ReadTimeout, ChunkedEncodingError, HTTPError

# ---------- CONFIG ----------
API_BASE = "https://fingertips.phe.org.uk/api"
UA = "fingertips-bronze-downloader/nb/4.0 (+dataops; mailto:you@example.com)"

OUTPUT_DIR = Path("out")                       # local output root
INGEST_DATE = dt.date.today().isoformat()      # partition date
PARENT_AREA_TYPE_ID: Optional[int] = 15        # England as parent (safe default) — set None to omit

# Concurrency & backoff
MAX_WORKERS = 6                                # main pass concurrency; lower if server is flaky
GENTLE_JITTER = (0.12, 0.28)                   # delay between task starts (seconds)

# Retry controls
RETRY_MAX_WORKERS = 4                          # retry concurrency
RETRY_PER_TASK_DEADLINE_S = 1800               # 30 min per retry task hard cap
RETRY_GROUP_LOG_EVERY = 5                      # log every N groups in group-concat fallback

# Indicator fallback controls
INDICATOR_FALLBACK_BATCH_SIZE = 25             # indicator_ids per request
INDICATOR_FALLBACK_MAX_BATCHES = 300           # hard cap per task to avoid runaway (~7,500 indicators)
INDICATOR_FALLBACK_LOG_EVERY = 5               # log cadence in batches

# Scope
ONLY_PROFILES: Optional[List[str]] = None      # e.g. ["public-health-outcomes-framework","local-health"] or None for ALL

# Metadata fetch
DUMP_METADATA = True                           # dump lookups + indicator metadata CSV
INCLUDE_AREAS_LISTS = True                     # dump areas_by_area_type lists (heavy but useful)

# S3 upload controls
DO_UPLOAD_TO_S3 = False                        # set True to upload
S3_BUCKET = "your-bucket-name"
S3_PREFIX = "your/prefix"                      # no leading/trailing slash handling below
S3_CLEAN_INGEST_DATE_FIRST = False             # True = delete s3 keys for this ingest_date before uploading
S3_MAX_UPLOAD_WORKERS = 8

# Behavior toggles
TREAT_ZERO_ROWS_IN_FALLBACK_AS_EMPTY = True    # mark as empty when groups/indicators have no rows
OMIT_PARENT_ON_MAIN_IF_500 = False             # optional: try no-parent directly in main on server 5xx

# Throttling (be nice to the API)
THROTTLE_BEFORE_REQUEST = (0.15, 0.35)   # sleep before each HTTP call inside fallbacks
THROTTLE_AFTER_SUCCESS  = (0.05, 0.15)   # short pause after a successful chunk
THROTTLE_AFTER_FAILURE  = (0.8, 1.6)     # longer pause after a failed attempt

# ---------- HTTP session ----------
def make_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({"User-Agent": UA, "Connection": "close"})
    retries = Retry(
        total=5,
        backoff_factor=0.6,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=frozenset(["GET"]),
        raise_on_status=False,
    )
    s.mount("https://", HTTPAdapter(max_retries=retries, pool_maxsize=50))
    return s
SESSION = make_session()

# ---------- Utilities ----------
def slugify(text: str) -> str:
    text = (text or "").strip().lower()
    text = re.sub(r"[^a-z0-9]+", "-", text)
    text = re.sub(r"-{2,}", "-", text).strip("-")
    return text or "unknown"

def now_iso():
    return dt.datetime.now(tzlocal()).isoformat()

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def write_json(path: Path, obj: dict):
    ensure_dir(path.parent)
    try:
        import orjson as _oj
        path.write_bytes(_oj.dumps(obj))
    except Exception:
        path.write_text(json.dumps(obj, ensure_ascii=False, separators=(",",":")))

def fmt_secs(s: float) -> str:
    if s < 60: return f"{int(s)}s"
    m = int(s // 60); sec = int(s % 60)
    if m < 60: return f"{m:02d}:{sec:02d}"
    h = m // 60; m %= 60
    return f"{h:d}:{m:02d}:{sec:02d}"

# ---------- Error classification ----------
def classify_exception(e: Exception) -> Tuple[str, Optional[int], str]:
    """
    Returns (reason, status_code, detail)
    reason ∈ {http_4xx,http_5xx,timeout,connection,protocol,other}
    """
    if isinstance(e, HTTPError):
        code = getattr(getattr(e, "response", None), "status_code", None)
        if code and 400 <= code < 500:
            return "http_4xx", code, str(e)
        if code and code >= 500:
            return "http_5xx", code, str(e)
        return "http_4xx", code, str(e)
    if isinstance(e, ReadTimeout):
        return "timeout", None, str(e)
    if isinstance(e, (ConnectionError, ChunkedEncodingError)):
        return "connection", None, str(e)
    if isinstance(e, Urllib3ProtocolError):
        return "protocol", None, str(e)
    return "other", None, f"{type(e).__name__}: {e}"

# ---------- Robust HTTP helpers ----------
def robust_get_json(url, params=None, max_tries=12, base_sleep=0.8):
    """
    JSON GET with backoff + jitter. Final attempt uses a fresh one-off request.
    """
    last_exc = None
    for attempt in range(max_tries):
        use_fresh = (attempt == max_tries - 1)
        try:
            if not use_fresh:
                r = SESSION.get(url, params=params, headers={"Connection": "close"}, timeout=(10, 180))
            else:
                r = requests.get(url, params=params, headers={"Connection": "close", "User-Agent": UA}, timeout=(10, 180))
            r.raise_for_status()
            return r.json()
        except (ConnectionError, ReadTimeout, ChunkedEncodingError, Urllib3ProtocolError, HTTPError) as e:
            last_exc = e
            if isinstance(e, HTTPError):
                code = getattr(e.response, "status_code", None)
                if code and code < 500:
                    raise
            if attempt < max_tries - 1:
                time.sleep(base_sleep * (2 ** attempt) + random.random() * 0.6)
            else:
                raise last_exc

def stream_csv_to_gzip(url: str, params: Dict[str, str], dest_path: Path) -> Tuple[int, str]:
    """Stream CSV to gzip; returns (row_count_estimate, sha256 of UNCOMPRESSED CSV)."""
    ensure_dir(dest_path.parent)
    sha = hashlib.sha256()
    rows = 0
    with SESSION.get(url, params=params, headers={"Connection": "close"}, stream=True, timeout=(10, 600)) as r:
        r.raise_for_status()
        with gzip.open(dest_path, "wb", compresslevel=6) as gz:
            for chunk in r.iter_content(chunk_size=1024*256):
                if not chunk:
                    continue
                gz.write(chunk)
                sha.update(chunk)
                rows += chunk.count(b"\n")
    return rows, sha.hexdigest()

def robust_stream_csv_to_gzip(url, params, dest_path, max_tries=10, base_sleep=0.8):
    """
    Wrapper that retries streaming; on final failure re-raises the *last* exception
    and ensures partial file is removed.
    """
    last_exc = None
    for attempt in range(max_tries):
        try:
            return stream_csv_to_gzip(url, params, dest_path)
        except (ConnectionError, ReadTimeout, ChunkedEncodingError, Urllib3ProtocolError, HTTPError) as e:
            last_exc = e
            try:
                if dest_path.exists(): dest_path.unlink()
            except Exception:
                pass
            if isinstance(e, HTTPError):
                code = getattr(e.response, "status_code", None)
                if code and code < 500:
                    break
            if attempt < max_tries - 1:
                time.sleep(base_sleep * (2 ** attempt) + random.random() * 0.5)
            else:
                break
    raise last_exc if last_exc else RuntimeError("unknown failure robust_stream_csv_to_gzip")

def _write_response_stream_to_gz(resp, gz, sha, skip_header=False):
    """
    Stream an HTTP CSV response into open gzip. If skip_header=True, drop the first line.
    Updates sha with UNCOMPRESSED bytes. Returns added row count.
    """
    rows_added = 0
    header_dropped = not skip_header
    carry = b""
    for chunk in resp.iter_content(chunk_size=1024*256):
        if not chunk:
            continue
        buf = carry + chunk
        if not header_dropped and b"\n" in buf:
            i = buf.index(b"\n") + 1
            buf = buf[i:]
            header_dropped = True
        elif not header_dropped:
            carry = buf
            continue
        else:
            carry = b""
        if buf:
            gz.write(buf)
            sha.update(buf)
            rows_added += buf.count(b"\n")
    return rows_added

# ---------- Fingertips API wrappers ----------
def fetch_profiles() -> List[dict]:
    return robust_get_json(f"{API_BASE}/profiles")

def fetch_all_area_types() -> List[dict]:
    return robust_get_json(f"{API_BASE}/area_types")

def fetch_area_types_for_profile(profile_id: int) -> List[dict]:
    return robust_get_json(f"{API_BASE}/area_types", params={"profile_id": profile_id})

def fetch_areas_by_type(area_type_id: int) -> List[dict]:
    return robust_get_json(f"{API_BASE}/areas/by_area_type", params={"area_type_id": area_type_id})

def fetch_ages() -> List[dict]:
    return robust_get_json(f"{API_BASE}/ages")

def fetch_sexes() -> List[dict]:
    return robust_get_json(f"{API_BASE}/sexes")

def fetch_value_notes() -> List[dict]:
    return robust_get_json(f"{API_BASE}/value_notes")

# ---------- Cached metadata helpers ----------
def _latest_meta_dir(out_root: Path) -> Optional[Path]:
    meta_root = out_root / "bronze" / "metadata"
    if not meta_root.exists():
        return None
    candidates = sorted([p for p in meta_root.iterdir() if p.is_dir() and p.name.startswith("ingest_date=")])
    return candidates[-1] if candidates else None

def load_cached_profiles(out_root: Path) -> list:
    d = _latest_meta_dir(out_root)
    if not d: return []
    f = d / "profiles.json"
    if not f.exists(): return []
    try:
        return json.loads(f.read_text(encoding="utf-8"))
    except Exception:
        return []

def load_cached_area_types_for_profile(out_root: Path, pid: int) -> list:
    d = _latest_meta_dir(out_root)
    if not d: return []
    f = d / f"area_types_profile={pid}.json"
    if not f.exists(): return []
    try:
        return json.loads(f.read_text(encoding="utf-8"))
    except Exception:
        return []

# ---------- Indicator metadata index (profile→groups→indicators) ----------
_META_INDEX = None   # lazy-loaded

def _norm(s: str) -> str:
    return re.sub(r"[^a-z0-9]+", "_", (s or "").strip().lower()).strip("_")

def load_indicator_index(out_root: Path) -> dict:
    """
    Returns dict:
      {
        "by_profile": { <profile_id>: set(indicator_id), ... },
        "by_profile_group": { (<profile_id>, <group_id>): set(indicator_id), ... }
      }
    Uses latest bronze/metadata/*/indicator_metadata.csv.gz
    """
    global _META_INDEX
    if _META_INDEX is not None:
        return _META_INDEX

    mdir = _latest_meta_dir(out_root)
    if not mdir:
        _META_INDEX = {"by_profile": {}, "by_profile_group": {}}
        return _META_INDEX
    gz_path = mdir / "indicator_metadata.csv.gz"
    if not gz_path.exists():
        _META_INDEX = {"by_profile": {}, "by_profile_group": {}}
        return _META_INDEX

    by_profile: Dict[int, set] = {}
    by_pg: Dict[Tuple[int, int], set] = {}

    with gzip.open(gz_path, "rt", encoding="utf-8", errors="ignore", newline="") as g:
        rdr = csv.reader(g)
        header = next(rdr, None)
        if not header:
            _META_INDEX = {"by_profile": {}, "by_profile_group": {}}
            return _META_INDEX
        cols = {_norm(c): i for i, c in enumerate(header)}
        idx_indicator = cols.get("indicator_id")
        idx_profile = cols.get("profile_id")
        idx_group = cols.get("domain_id", None)
        if idx_group is None:
            idx_group = cols.get("group_id", None)

        if idx_indicator is None or idx_profile is None:
            _META_INDEX = {"by_profile": {}, "by_profile_group": {}}
            return _META_INDEX

        for row in rdr:
            try:
                ind = int(row[idx_indicator])
                pid = int(row[idx_profile])
                gid = int(row[idx_group]) if (idx_group is not None and row[idx_group]) else None
            except Exception:
                continue
            by_profile.setdefault(pid, set()).add(ind)
            if gid is not None:
                by_pg.setdefault((pid, gid), set()).add(ind)

    _META_INDEX = {"by_profile": by_profile, "by_profile_group": by_pg}
    return _META_INDEX

# ---------- Bronze metadata snapshot ----------
def dump_bronze_metadata(out_root: Path, ingest_date: str, include_area_lists: bool=True):
    meta_dir = out_root / "bronze" / "metadata" / f"ingest_date={ingest_date}"
    ensure_dir(meta_dir)

    # Profiles
    profiles = []
    try:
        profiles = fetch_profiles()
        write_json(meta_dir / "profiles.json", profiles)
    except Exception as e:
        print("[WARN] profiles fetch failed; continuing without profiles.json →", e)

    # Indicator metadata CSV
    ind_csv = meta_dir / "indicator_metadata.csv.gz"
    if not ind_csv.exists():
        try:
            url = f"{API_BASE}/indicator_metadata/csv/all"
            robust_stream_csv_to_gzip(url, {}, ind_csv)
        except Exception as e:
            print("[WARN] indicator_metadata CSV fetch failed; skipping →", e)

    # Entities
    for name, fn in [("ages.json", fetch_ages), ("sexes.json", fetch_sexes), ("value_notes.json", fetch_value_notes)]:
        try:
            write_json(meta_dir / name, fn())
        except Exception as e:
            print(f"[WARN] {name} fetch failed; skipping →", e)

    # Area types list
    try:
        at_all = fetch_all_area_types()
        write_json(meta_dir / "area_types.json", at_all)
    except Exception as e:
        print("[WARN] area_types fetch failed; skipping →", e)
        at_all = []

    # Areas by type (optional/heavy)
    if include_area_lists and at_all:
        ids = []
        for a in at_all:
            atid = a.get("Id", a.get("AreaTypeId"))
            if isinstance(atid, int): ids.append(atid)
        for atid in sorted(set(ids)):
            try:
                write_json(meta_dir / f"areas_area_type_id={atid}.json", fetch_areas_by_type(atid))
            except Exception:
                continue

    # Per-profile area types (handy cache)
    for p in profiles:
        try:
            ats = fetch_area_types_for_profile(p["Id"])
            write_json(meta_dir / f"area_types_profile={p['Id']}.json", ats)
        except Exception:
            continue

# ---------- Download per (profile, area_type) ----------
def _meta_base_paths(profile_key: str, area_type_id: int, ingest_date: str, out_root: Path):
    base = (out_root / "bronze" / "data" /
            f"profile_key={profile_key}" /
            f"area_type_id={area_type_id}" /
            f"ingest_date={ingest_date}")
    return base, base / "all_data_by_profile.csv.gz", base / "_meta.json"

def download_profile_area_csv(profile: dict, area_type_id: int, out_root: Path,
                              ingest_date: str, parent_area_type_id: Optional[int]) -> dict:
    """
    Returns dict with one of:
      - {"ok": True, "rows": N, ...}
      - {"empty": True, ...}
      - {"error": "...", "reason": <category>, "status": <http code or None>, "url":..., "params":...}
    """
    profile_id = profile["Id"]
    profile_key = profile.get("Key") or slugify(profile["Name"])
    base, csv_path, meta_path = _meta_base_paths(profile_key, area_type_id, ingest_date, out_root)
    ensure_dir(base)

    # Idempotent skip (both OK and EMPTY)
    if meta_path.exists():
        try:
            meta = json.loads(meta_path.read_text(encoding="utf-8"))
        except Exception:
            meta = {}
        if meta.get("empty") is True:
            return {"skipped_empty": True, "profile_key": profile_key, "area_type_id": area_type_id}
        if csv_path.exists():
            return {"skipped": True, "profile_key": profile_key, "area_type_id": area_type_id}

    url = f"{API_BASE}/all_data/csv/by_profile_id"
    params = {
        "profile_id": str(profile_id),
        "child_area_type_id": str(area_type_id),
    }
    if parent_area_type_id is not None:
        params["parent_area_type_id"] = str(parent_area_type_id)

    try:
        rows, sha = robust_stream_csv_to_gzip(url, params, csv_path)
    except Exception as e:
        # Optional immediate no-parent retry on 5xx (main pass)
        if OMIT_PARENT_ON_MAIN_IF_500:
            reason, code, detail = classify_exception(e)
            if reason == "http_5xx":
                try:
                    params2 = {"profile_id": str(profile_id), "child_area_type_id": str(area_type_id)}
                    rows, sha = robust_stream_csv_to_gzip(url, params2, csv_path)
                    if rows <= 1:
                        try: csv_path.unlink()
                        except Exception: pass
                        write_json(meta_path, {
                            "endpoint": url, "params": params2,
                            "profile": {"Id": profile_id, "Name": profile["Name"], "Key": profile_key},
                            "area_type_id": area_type_id, "parent_area_type_id": None,
                            "empty": True, "reason": "header-only (no data rows)", "fetched_at": now_iso(),
                            "note": "main:used_no_parent"
                        })
                        return {"empty": True, "profile_key": profile_key, "area_type_id": area_type_id}
                    write_json(meta_path, {
                        "endpoint": url, "params": params2,
                        "profile": {"Id": profile_id, "Name": profile["Name"], "Key": profile_key},
                        "area_type_id": area_type_id, "parent_area_type_id": None,
                        "file": csv_path.name, "sha256_uncompressed": sha,
                        "row_count_estimate": rows, "fetched_at": now_iso(),
                        "note": "main:used_no_parent"
                    })
                    return {"ok": True, "profile_key": profile_key, "area_type_id": area_type_id, "rows": rows}
                except Exception:
                    pass
        # give error to retry logic
        reason, code, detail = classify_exception(e)
        return {"error": f"download failed: {detail}", "reason": reason, "status": code,
                "url": url, "params": params,
                "profile_key": profile_key, "profile_id": profile_id, "area_type_id": area_type_id}

    if rows <= 1:
        # header-only → record EMPTY and remove CSV
        try: csv_path.unlink()
        except Exception: pass
        write_json(meta_path, {
            "endpoint": url, "params": params,
            "profile": {"Id": profile_id, "Name": profile["Name"], "Key": profile_key},
            "area_type_id": area_type_id, "parent_area_type_id": parent_area_type_id,
            "empty": True, "reason": "header-only (no data rows)", "fetched_at": now_iso()
        })
        return {"empty": True, "profile_key": profile_key, "area_type_id": area_type_id}

    write_json(meta_path, {
        "endpoint": url, "params": params,
        "profile": {"Id": profile_id, "Name": profile["Name"], "Key": profile_key},
        "area_type_id": area_type_id, "parent_area_type_id": parent_area_type_id,
        "file": csv_path.name, "sha256_uncompressed": sha,
        "row_count_estimate": rows, "fetched_at": now_iso()
    })
    return {"ok": True, "profile_key": profile_key, "area_type_id": area_type_id, "rows": rows}

# ---------- Fallback A: concat groups ----------
def robust_concat_groups_to_single_gzip(profile: dict,
                                        area_type_id: int,
                                        parent_area_type_id: Optional[int],
                                        dest_path: Path,
                                        deadline_ts: Optional[float] = None,
                                        log_every: int = 5) -> Tuple[int, str, List[int]]:
    ensure_dir(dest_path.parent)
    groups = profile.get("GroupIds") or []
    if not groups:
        return 0, "", []

    sha = hashlib.sha256()
    total_rows = 0
    used: List[int] = []
    key = (profile.get("Key") or slugify(profile["Name"]))
    n = len(groups)

    with gzip.open(dest_path, "wb", compresslevel=6) as gz:
        first = True
        for i, gid in enumerate(groups, start=1):
            if deadline_ts and time.time() >= deadline_ts:
                print(f"[retry][{key}/{area_type_id}] timeout after {i-1}/{n} groups")
                break

            url = f"{API_BASE}/all_data/csv/by_group_id"
            params = {"group_id": str(gid), "child_area_type_id": str(area_type_id)}
            if parent_area_type_id is not None:
                params["parent_area_type_id"] = str(parent_area_type_id)

            # gentle throttle before each group request
            time.sleep(random.uniform(*THROTTLE_BEFORE_REQUEST))

            for attempt in range(6):
                try:
                    with SESSION.get(url, params=params, headers={"Connection": "close"},
                                     stream=True, timeout=(10, 600)) as r:
                        r.raise_for_status()
                        added = _write_response_stream_to_gz(r, gz, sha, skip_header=not first)
                        if added > 1:
                            total_rows += added
                            used.append(gid)
                    # tiny breather on success
                    time.sleep(random.uniform(*THROTTLE_AFTER_SUCCESS))
                    break
                except (ConnectionError, ReadTimeout, ChunkedEncodingError, Urllib3ProtocolError, HTTPError) as e:
                    # exponential backoff + extra pause to be nice after failures
                    if attempt < 5:
                        time.sleep(0.6 * (2 ** attempt) + random.random() * 0.5)
                        time.sleep(random.uniform(*THROTTLE_AFTER_FAILURE))
                    else:
                        print(f"[retry][{key}/{area_type_id}] group {gid} failed permanently: {e}")

            if (i == 1) or (i % log_every == 0):
                print(f"[retry][{key}/{area_type_id}] groups {i}/{n} (used={len(used)}, rows~{total_rows})")

            first = False

    return total_rows, sha.hexdigest(), used



# ---------- Fallback B: concat indicators (from metadata index) ----------
def iter_indicator_batches(indicator_ids: Iterable[int], batch_size: int) -> Iterable[List[int]]:
    batch: List[int] = []
    for i in indicator_ids:
        batch.append(int(i))
        if len(batch) >= batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

def robust_concat_indicators_to_single_gzip(profile: dict,
                                            area_type_id: int,
                                            parent_area_type_id: Optional[int],
                                            dest_path: Path,
                                            indicator_ids: List[int],
                                            deadline_ts: Optional[float] = None,
                                            log_every_batches: int = 5,
                                            batch_size: int = 25,
                                            max_batches: int = 300) -> Tuple[int, str, List[int]]:
    ensure_dir(dest_path.parent)
    if not indicator_ids:
        return 0, "", []

    sha = hashlib.sha256()
    total_rows = 0
    used: List[int] = []
    key = (profile.get("Key") or slugify(profile["Name"]))

    with gzip.open(dest_path, "wb", compresslevel=6) as gz:
        first = True
        for b_idx, batch in enumerate(iter_indicator_batches(indicator_ids, batch_size), start=1):
            if b_idx > max_batches:
                print(f"[retry][{key}/{area_type_id}] indicator concat hit max_batches={max_batches}")
                break
            if deadline_ts and time.time() >= deadline_ts:
                print(f"[retry][{key}/{area_type_id}] indicator concat deadline reached after {b_idx-1} batches")
                break

            url = f"{API_BASE}/all_data/csv/by_indicator_id"
            params = {"indicator_ids": ",".join(str(x) for x in batch),
                      "child_area_type_id": str(area_type_id)}
            if parent_area_type_id is not None:
                params["parent_area_type_id"] = str(parent_area_type_id)

            # gentle throttle before each indicator batch
            time.sleep(random.uniform(*THROTTLE_BEFORE_REQUEST))

            for attempt in range(6):
                try:
                    with SESSION.get(url, params=params, headers={"Connection": "close"},
                                     stream=True, timeout=(10, 600)) as r:
                        r.raise_for_status()
                        added = _write_response_stream_to_gz(r, gz, sha, skip_header=not first)
                        if added > 1:
                            total_rows += added
                            used.extend(batch)
                    # tiny breather on success
                    time.sleep(random.uniform(*THROTTLE_AFTER_SUCCESS))
                    break
                except (ConnectionError, ReadTimeout, ChunkedEncodingError, Urllib3ProtocolError, HTTPError) as e:
                    if attempt < 5:
                        time.sleep(0.6 * (2 ** attempt) + random.random() * 0.5)
                        time.sleep(random.uniform(*THROTTLE_AFTER_FAILURE))
                    else:
                        print(f"[retry][{key}/{area_type_id}] indicator batch failed permanently: {e}")

            if (b_idx == 1) or (b_idx % log_every_batches == 0):
                print(f"[retry][{key}/{area_type_id}] indicator batches {b_idx} (used≈{len(used)}, rows~{total_rows})")

            first = False

    return total_rows, sha.hexdigest(), used


def recover_profile_area_into_same_file(profile: dict, area_type_id: int, out_root: Path,
                                        ingest_date: str, parent_area_type_id: Optional[int]) -> dict:
    """
    Fallbacks for stubborn pairs:
      1) by_profile_id WITHOUT parent_area_type_id
      2) concatenate all by_group_id CSVs into ONE (header deduped) with per-task deadline
      3) concatenate by_indicator_id (batched) using Indicator Metadata index
    Returns one of ok/empty/error dicts (like main).
    """
    key = (profile.get("Key") or slugify(profile["Name"]))
    base, csv_path, meta_path = _meta_base_paths(key, area_type_id, ingest_date, out_root)
    ensure_dir(base)

    # honor prior empties/ok
    if meta_path.exists():
        try:
            meta = json.loads(meta_path.read_text(encoding="utf-8"))
        except Exception:
            meta = {}
        if meta.get("empty") is True:
            print(f"[retry][{key}/{area_type_id}] skipped (already empty in meta)")
            return {"skipped_empty": True, "profile_key": key, "area_type_id": area_type_id}
        if csv_path.exists():
            print(f"[retry][{key}/{area_type_id}] skipped (csv already exists)")
            return {"skipped": True, "profile_key": key, "area_type_id": area_type_id}

    # A) by_profile_id WITHOUT parent
    url = f"{API_BASE}/all_data/csv/by_profile_id"
    params_no_parent = {"profile_id": str(profile["Id"]), "child_area_type_id": str(area_type_id)}
    print(f"[retry][{key}/{area_type_id}] trying by_profile_id (no parent)")
    try:
        rows, sha = robust_stream_csv_to_gzip(url, params_no_parent, csv_path)
        if rows > 1:
            write_json(meta_path, {
                "endpoint": url, "params": params_no_parent,
                "profile": {"Id": profile["Id"], "Name": profile["Name"], "Key": key},
                "area_type_id": area_type_id, "parent_area_type_id": None,
                "file": csv_path.name, "sha256_uncompressed": sha,
                "row_count_estimate": rows, "fetched_at": now_iso(), "note": "fallback:no-parent"
            })
            print(f"[retry][{key}/{area_type_id}] success via no-parent rows~{rows}")
            return {"ok": True, "profile_key": key, "area_type_id": area_type_id, "rows": rows}
        else:
            try: csv_path.unlink()
            except Exception: pass
            write_json(meta_path, {
                "endpoint": url, "params": params_no_parent,
                "profile": {"Id": profile["Id"], "Name": profile["Name"], "Key": key},
                "area_type_id": area_type_id, "parent_area_type_id": None,
                "empty": True, "reason": "header-only after no-parent", "fetched_at": now_iso()
            })
            print(f"[retry][{key}/{area_type_id}] empty via no-parent")
            return {"empty": True, "profile_key": key, "area_type_id": area_type_id}
    except Exception as e:
        reason, code, detail = classify_exception(e)
        print(f"[retry][{key}/{area_type_id}] no-parent failed: {reason} {code or ''} {detail}")

    # B) group concat with deadline
    try:
        if csv_path.exists():
            csv_path.unlink()
    except Exception:
        pass

    deadline_ts = time.time() + RETRY_PER_TASK_DEADLINE_S
    print(f"[retry][{key}/{area_type_id}] trying group concat (deadline {RETRY_PER_TASK_DEADLINE_S}s)")
    rows2, sha2, groups_used = robust_concat_groups_to_single_gzip(
        profile, area_type_id, parent_area_type_id, csv_path,
        deadline_ts=deadline_ts, log_every=RETRY_GROUP_LOG_EVERY
    )
    if rows2 > 1 and groups_used:
        write_json(meta_path, {
            "endpoint": f"{API_BASE}/all_data/csv/by_group_id",
            "params": {"group_ids": groups_used, "child_area_type_id": area_type_id,
                       "parent_area_type_id": parent_area_type_id},
            "profile": {"Id": profile["Id"], "Name": profile["Name"], "Key": key},
            "area_type_id": area_type_id,
            "file": csv_path.name, "sha256_uncompressed": sha2,
            "row_count_estimate": rows2, "fetched_at": now_iso(),
            "note": "fallback:combined_from_groups"
        })
        print(f"[retry][{key}/{area_type_id}] success via groups used={len(groups_used)} rows~{rows2}")
        return {"ok": True, "profile_key": key, "area_type_id": area_type_id,
                "rows": rows2, "note": "combined_from_groups"}

    # C) indicator concat fallback using metadata index
    try:
        if csv_path.exists():
            csv_path.unlink()
    except Exception:
        pass

    idx = load_indicator_index(out_root)
    pid = profile["Id"]
    indicator_ids = sorted(idx.get("by_profile", {}).get(pid, []))
    if indicator_ids:
        print(f"[retry][{key}/{area_type_id}] trying indicator concat (batches of {INDICATOR_FALLBACK_BATCH_SIZE})")
        rows3, sha3, inds_used = robust_concat_indicators_to_single_gzip(
            profile, area_type_id, parent_area_type_id, csv_path,
            indicator_ids=indicator_ids,
            deadline_ts=deadline_ts,
            log_every_batches=INDICATOR_FALLBACK_LOG_EVERY,
            batch_size=INDICATOR_FALLBACK_BATCH_SIZE,
            max_batches=INDICATOR_FALLBACK_MAX_BATCHES
        )
        if rows3 > 1 and inds_used:
            write_json(meta_path, {
                "endpoint": f"{API_BASE}/all_data/csv/by_indicator_id",
                "params": {"indicator_ids_used_count": len(inds_used), "child_area_type_id": area_type_id,
                           "parent_area_type_id": parent_area_type_id},
                "profile": {"Id": profile["Id"], "Name": profile["Name"], "Key": key},
                "area_type_id": area_type_id,
                "file": csv_path.name, "sha256_uncompressed": sha3,
                "row_count_estimate": rows3, "fetched_at": now_iso(),
                "note": "fallback:combined_from_indicators"
            })
            print(f"[retry][{key}/{area_type_id}] success via indicators used≈{len(inds_used)} rows~{rows3}")
            return {"ok": True, "profile_key": key, "area_type_id": area_type_id,
                    "rows": rows3, "note": "combined_from_indicators"}

    # No rows from groups or indicators → treat as EMPTY if configured
    if TREAT_ZERO_ROWS_IN_FALLBACK_AS_EMPTY:
        try:
            if csv_path.exists(): csv_path.unlink()
        except Exception:
            pass
        write_json(meta_path, {
            "endpoint": "multiple",
            "params": {"attempts": ["by_profile_id(no_parent)", "by_group_id(*groups)", "by_indicator_id(*batched)"]},
            "profile": {"Id": profile["Id"], "Name": profile["Name"], "Key": key},
            "area_type_id": area_type_id, "parent_area_type_id": parent_area_type_id,
            "empty": True, "reason": "no rows from groups/indicators", "fetched_at": now_iso()
        })
        print(f"[retry][{key}/{area_type_id}] marked EMPTY after all fallbacks (no rows)")
        return {"empty": True, "profile_key": key, "area_type_id": area_type_id}

    print(f"[retry][{key}/{area_type_id}] fallbacks failed")
    return {"error": f"fallbacks_failed for profile_id={profile['Id']}, area_type_id={area_type_id}",
            "reason": "fallbacks_failed", "status": None,
            "profile_key": key, "profile_id": profile["Id"], "area_type_id": area_type_id}

# ---------- Bronze completeness audit ----------
def bronze_completeness_audit(out_root: Path, ingest_date: Optional[str]) -> dict:
    print("\nRunning Bronze completeness audit…")
    data_root = out_root / "bronze" / "data"
    ingest_dirs = sorted({p for p in data_root.rglob("ingest_date=*") if p.is_dir()})
    if ingest_date is None:
        ingest_date = ingest_dirs[-1].name.split("=",1)[1] if ingest_dirs else None
    print("Using INGEST_DATE:", ingest_date or "(none found)")

    # load cached meta
    meta_root = out_root / "bronze" / "metadata"
    def latest_meta_dir():
        ds = sorted([p for p in meta_root.glob("ingest_date=*") if p.is_dir()])
        return ds[-1] if ds else None

    profiles, at_by_profile = [], {}
    m_dir = latest_meta_dir()
    if m_dir and (m_dir/"profiles.json").exists():
        profiles = json.loads((m_dir/"profiles.json").read_text(encoding="utf-8"))
    if m_dir:
        for f in m_dir.glob("area_types_profile=*.json"):
            try:
                pid = int(f.stem.split("=",1)[1])
                at_by_profile[pid] = json.loads(f.read_text(encoding="utf-8"))
            except Exception:
                pass

    def token(parts, key):
        for p in parts:
            if isinstance(p, str) and p.startswith(f"{key}="):
                return p.split("=",1)[1]
        return None

    def scan_bronze_pairs(out_root: Path, ingest_date: str = None):
        base = out_root / "bronze" / "data"
        rec = []
        for meta_path in base.rglob("_meta.json"):
            parts = list(meta_path.parts)
            pkey = token(parts, "profile_key")
            atid = token(parts, "area_type_id")
            idate = token(parts, "ingest_date")
            if not (pkey and atid and idate):
                continue
            if ingest_date and idate != ingest_date:
                continue
            try:
                meta = json.loads(meta_path.read_text(encoding="utf-8"))
            except Exception:
                meta = {}
            status = "empty" if meta.get("empty") else "ok"
            rec.append((pkey, int(atid), idate, status, meta_path))
        return rec

    # Also treat CSV-only (no meta) as ok_no_meta
    def scan_csv_without_meta(out_root: Path, ingest_date: str = None):
        base = out_root / "bronze" / "data"
        rec = []
        for csv_path in base.rglob("all_data_by_profile.csv.gz"):
            parts = list(csv_path.parts)
            pkey = token(parts, "profile_key")
            atid = token(parts, "area_type_id")
            idate = token(parts, "ingest_date")
            if not (pkey and atid and idate):
                continue
            if ingest_date and idate != ingest_date:
                continue
            meta_path = csv_path.parent / "_meta.json"
            if not meta_path.exists():
                rec.append((pkey, int(atid), idate, "ok_no_meta", csv_path))
        return rec

    # Build expected (profile_key, area_type_id) pairs from cached metadata
    def _slug(x: str) -> str:
        return slugify(x)

    expected_pairs = []
    for p in profiles:
        pkey = (p.get("Key") or _slug(p["Name"])).lower()
        ats = at_by_profile.get(p["Id"], [])
        ids = []
        for a in ats:
            atid = a.get("Id", a.get("AreaTypeId"))
            if isinstance(atid, int):
                ids.append(atid)
        for atid in sorted(set(ids)):
            expected_pairs.append((pkey, p["Id"], atid))
    print("Expected pairs (from cache):", len(expected_pairs))

    actual = scan_bronze_pairs(out_root, ingest_date)
    print("Actual pairs with meta:", len(actual))

    csv_only = scan_csv_without_meta(out_root, ingest_date)
    if csv_only:
        print("CSV-only pairs (no meta):", len(csv_only))

    actual_idx = {(pkey, atid): status for (pkey, atid, _, status, _) in actual}
    for (pkey, atid, _, status, _) in csv_only:
        actual_idx.setdefault((pkey, atid), status)

    if expected_pairs:
        expected_keys = {(pkey, atid) for (pkey, _, atid) in expected_pairs}
    else:
        expected_keys = set(actual_idx.keys())

    covered_ok, covered_empty, missing = [], [], []
    for (pkey, pid, atid) in (expected_pairs or [(p, None, a) for (p, a) in expected_keys]):
        st = actual_idx.get((pkey, atid))
        if st is None:
            missing.append((pkey, pid, atid))
        elif st == "empty":
            covered_empty.append((pkey, pid, atid))
        else:
            covered_ok.append((pkey, pid, atid))

    print("\n=== Completeness ===")
    print("Expected pairs:", len(expected_keys))
    print("Covered OK (incl. ok_no_meta):", len(covered_ok))
    print("Covered EMPTY:", len(covered_empty))
    print("Missing:", len(missing))

    # Write audit CSVs (pandas optional)
    audit_dir = out_root / "bronze" / "audits" / (
        f"ingest_date={ingest_date}" if ingest_date else "ingest_date=unknown"
    )
    ensure_dir(audit_dir)
    try:
        import pandas as pd
        pd.DataFrame(covered_ok, columns=["profile_key","profile_id","area_type_id"]).to_csv(
            audit_dir / "covered_ok.csv", index=False
        )
        pd.DataFrame(covered_empty, columns=["profile_key","profile_id","area_type_id"]).to_csv(
            audit_dir / "covered_empty.csv", index=False
        )
        pd.DataFrame(missing, columns=["profile_key","profile_id","area_type_id"]).to_csv(
            audit_dir / "missing.csv", index=False
        )
    except Exception:
        def w(path, rows, hdr):
            with open(path, "w", encoding="utf-8") as f:
                f.write(",".join(hdr) + "\n")
                for r in rows:
                    f.write(",".join("" if (x is None) else str(x) for x in r) + "\n")
        w(audit_dir / "covered_ok.csv", covered_ok, ["profile_key","profile_id","area_type_id"])
        w(audit_dir / "covered_empty.csv", covered_empty, ["profile_key","profile_id","area_type_id"])
        w(audit_dir / "missing.csv", missing, ["profile_key","profile_id","area_type_id"])

    print("Audit written to:", audit_dir)
    return {
        "ingest_date": ingest_date,
        "expected_pairs": len(expected_keys),
        "covered_ok": len(covered_ok),
        "covered_empty": len(covered_empty),
        "missing": len(missing),
        "audit_dir": str(audit_dir),
    }

# ---------- Backfill meta for CSV-only partitions ----------
def backfill_meta_for_csv_only(out_root: Path, ingest_date: str) -> int:
    """
    Create _meta.json for any partition that has all_data_by_profile.csv.gz but no _meta.json.
    Computes sha256 of UNCOMPRESSED CSV (streamed) and counts rows.
    Returns number of meta files written.
    """
    print("\nBackfilling meta for CSV-only partitions (if any)…")
    data_root = out_root / "bronze" / "data"

    def token(parts, key):
        for p in parts:
            if isinstance(p, str) and p.startswith(f"{key}="):
                return p.split("=",1)[1]
        return None

    def sha256_and_rows_of_gzip_uncompressed(p: Path) -> Tuple[str, int]:
        h = hashlib.sha256()
        rows = 0
        with gzip.open(p, "rb") as g:
            while True:
                chunk = g.read(1024 * 256)
                if not chunk:
                    break
                h.update(chunk)
                rows += chunk.count(b"\n")
        return h.hexdigest(), rows

    written = 0
    now_s = now_iso()

    for csv_path in data_root.rglob("all_data_by_profile.csv.gz"):
        parts = list(csv_path.parts)
        pkey = token(parts, "profile_key")
        atid = token(parts, "area_type_id")
        idate = token(parts, "ingest_date")
        if not (pkey and atid and idate):
            continue
        if ingest_date and idate != ingest_date:
            continue
        meta_path = csv_path.parent / "_meta.json"
        if meta_path.exists():
            continue

        try:
            sha, rows = sha256_and_rows_of_gzip_uncompressed(csv_path)
        except Exception as e:
            print(f"[WARN] could not read {csv_path}: {e}")
            continue

        if rows <= 1:
            # header-only → mark empty and remove CSV for consistency
            try:
                csv_path.unlink()
            except Exception:
                pass
            write_json(meta_path, {
                "endpoint": None,
                "params": None,
                "profile": {"Key": pkey},
                "area_type_id": int(atid),
                "parent_area_type_id": None,
                "empty": True,
                "reason": "header-only discovered during backfill",
                "fetched_at": now_s
            })
        else:
            write_json(meta_path, {
                "endpoint": None,
                "params": None,
                "profile": {"Key": pkey},
                "area_type_id": int(atid),
                "parent_area_type_id": None,
                "file": "all_data_by_profile.csv.gz",
                "sha256_uncompressed": sha,
                "row_count_estimate": rows,
                "fetched_at": now_s,
                "note": "backfilled_meta"
            })
        written += 1

    print(f"Backfilled meta files: {written}")
    return written

# ---------- S3 upload (bronze/ingest_date=...) ----------
def s3_upload_bronze_ingest(out_root: Path, ingest_date: str,
                            bucket: str, prefix: str,
                            clean_first: bool = False,
                            max_workers: int = 8):
    import boto3
    from boto3.s3.transfer import TransferConfig

    s3 = boto3.client("s3")
    prefix = (prefix or "").strip("/")
    base_local = out_root / "bronze"
    base_s3 = f"{prefix}/bronze" if prefix else "bronze"
    ingest_marker = f"ingest_date={ingest_date}"

    # optional clean (delete keys for this ingest_date)
    if clean_first:
        print(f"\n[ S3 CLEAN ] Deleting existing s3://{bucket}/{base_s3}/**/{ingest_marker}/**")
        token = None
        to_delete = []
        while True:
            kw = dict(Bucket=bucket, Prefix=f"{base_s3}/")
            if token:
                kw["ContinuationToken"] = token
            resp = s3.list_objects_v2(**kw)
            for it in resp.get("Contents", []) or []:
                key = it["Key"]
                if f"/{ingest_marker}/" in key:
                    to_delete.append({"Key": key})
                    if len(to_delete) == 1000:
                        s3.delete_objects(Bucket=bucket, Delete={"Objects": to_delete})
                        to_delete = []
            token = resp.get("NextContinuationToken")
            if not token:
                break
        if to_delete:
            s3.delete_objects(Bucket=bucket, Delete={"Objects": to_delete})
        print("[ S3 CLEAN ] Done.")

    print(f"\n[ S3 UPLOAD ] Uploading bronze ingest_date={ingest_date} → s3://{bucket}/{base_s3}/")
    files = []
    for p in base_local.rglob("*"):
        if p.is_file() and f"/{ingest_marker}/" in str(p.as_posix()):
            rel = p.relative_to(out_root)
            s3_key = f"{prefix}/{rel.as_posix()}" if prefix else rel.as_posix()
            files.append((p, s3_key))

    print(f"Files to upload: {len(files)}")

    def mime_for(path: Path) -> Dict[str, str]:
        name = path.name.lower()
        if name.endswith(".csv.gz"):
            return {"ContentType": "text/csv", "ContentEncoding": "gzip"}
        if name.endswith(".json.gz"):
            return {"ContentType": "application/json", "ContentEncoding": "gzip"}
        if name.endswith(".json"):
            return {"ContentType": "application/json"}
        return {"ContentType": "application/octet-stream"}

    cfg = TransferConfig(
        multipart_threshold=8 * 1024 * 1024,
        max_concurrency=max_workers,
        multipart_chunksize=8 * 1024 * 1024,
        use_threads=True,
    )

    def _upload_one(p: Path, key: str):
        extra = mime_for(p)
        s3.upload_file(str(p), bucket, key, ExtraArgs=extra, Config=cfg)
        return key

    uploaded = 0
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = [ex.submit(_upload_one, p, k) for (p, k) in files]
        for f in tqdm(as_completed(futs), total=len(futs), desc="S3 Upload"):
            try:
                f.result()
                uploaded += 1
            except Exception as e:
                print("[ERROR] upload failed:", e)

    print(f"[ S3 UPLOAD ] Uploaded {uploaded}/{len(files)} files.")

# ---------- MAIN ----------
def main():
    out_root = OUTPUT_DIR
    ensure_dir(out_root)
    t0 = time.time()

    # (optional) metadata snapshot
    if DUMP_METADATA:
        print("Dumping Bronze metadata…")
        dump_bronze_metadata(out_root, INGEST_DATE, include_area_lists=INCLUDE_AREAS_LISTS)

    # profiles (with cache fallback)
    try:
        profiles = fetch_profiles()
    except Exception as e:
        print("[WARN] /api/profiles failed; using cached profiles.json if available →", e)
        profiles = load_cached_profiles(out_root)
        if not profiles:
            raise RuntimeError("Could not fetch /api/profiles and no cached profiles.json found.")

    # selection
    if ONLY_PROFILES:
        wanted = {p.lower() for p in ONLY_PROFILES}
        profiles = [p for p in profiles if (p.get("Key") or slugify(p["Name"])).lower() in wanted]

    # build tasks using per-profile area types (with cache fallback)
    tasks = []
    at_cache_misses = 0
    for p in profiles:
        try:
            ats = fetch_area_types_for_profile(p["Id"])
        except Exception:
            at_cache_misses += 1
            ats = load_cached_area_types_for_profile(out_root, p["Id"])
        ids = []
        for a in ats or []:
            atid = a.get("Id", a.get("AreaTypeId"))
            if isinstance(atid, int):
                ids.append(atid)
        for atid in sorted(set(ids)):
            tasks.append((p, atid))

    print(f"Planned tasks: {len(tasks)} from {len(profiles)} profiles (AT cache misses: {at_cache_misses})")

    # run main pass
    results = []
    started = time.time()

    def _run_task(p, atid):
        return download_profile_area_csv(p, atid, out_root, INGEST_DATE, PARENT_AREA_TYPE_ID)

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futs = []
        for (p, atid) in tasks:
            futs.append(ex.submit(_run_task, p, atid))
            time.sleep(random.uniform(*GENTLE_JITTER))
        for f in tqdm(as_completed(futs), total=len(futs), desc="Bronze CSV"):
            try:
                results.append(f.result())
            except Exception as e:
                reason, code, detail = classify_exception(e)
                results.append({"error": detail, "reason": reason, "status": code})

    dur_main = time.time() - started
    ok_main = sum(1 for r in results if r and r.get("ok"))
    empty_main = sum(1 for r in results if r and r.get("empty"))
    err_main = [r for r in results if r and r.get("error")]
    print(f"Main pass → OK:{ok_main}  Empties:{empty_main}  Errors:{len(err_main)}  (in {fmt_secs(dur_main)})")

    if err_main:
        by_reason = {}
        for r in err_main:
            by_reason[r.get("reason", "other")] = by_reason.get(r.get("reason", "other"), 0) + 1
        print("Error breakdown:", ", ".join(f"{k}:{v}" for k, v in sorted(by_reason.items())))

    # build failures list (with ids)
    failed_pairs = sorted({
        (r.get("profile_id"), r.get("area_type_id"))
        for r in err_main if r.get("profile_id") and r.get("area_type_id")
    })
    print("Failures to retry:", len(failed_pairs))
    if failed_pairs:
        print("Retrying pairs:")
        for (pid, atid) in failed_pairs:
            p = next((pp for pp in profiles if pp["Id"] == pid), None)
            key = (p.get("Key") or slugify(p["Name"])) if p else "?"
            print(f"  - profile_id={pid} key={key} area_type_id={atid}")

    profiles_by_id = {p["Id"]: p for p in profiles}

    # retry fallbacks
    retry_outcomes = []
    started_retry = time.time()
    with ThreadPoolExecutor(max_workers=RETRY_MAX_WORKERS) as ex:
        futs = []
        for (pid, atid) in failed_pairs:
            p = profiles_by_id.get(pid)
            if not p:
                continue
            futs.append(ex.submit(
                recover_profile_area_into_same_file, p, atid, out_root, INGEST_DATE, PARENT_AREA_TYPE_ID
            ))
            time.sleep(random.uniform(*GENTLE_JITTER))

        for f in tqdm(as_completed(futs), total=len(futs), desc="Retry fallbacks"):
            try:
                retry_outcomes.append(f.result())
            except Exception as e:
                reason, code, detail = classify_exception(e)
                retry_outcomes.append({"error": detail, "reason": reason, "status": code})

    dur_retry = time.time() - started_retry
    ok_retry = sum(1 for r in retry_outcomes if r and r.get("ok"))
    empty_retry = sum(1 for r in retry_outcomes if r and r.get("empty"))
    err_retry = [r for r in retry_outcomes if r and r.get("error")]
    print(f"Retry pass → OK:{ok_retry}  Empties:{empty_retry}  Errors:{len(err_retry)}  (in {fmt_secs(dur_retry)})")
    if err_retry:
        by_reason_r = {}
        for r in err_retry:
            by_reason_r[r.get("reason", "other")] = by_reason_r.get(r.get("reason", "other"), 0) + 1
        print("Retry error breakdown:", ", ".join(f"{k}:{v}" for k, v in sorted(by_reason_r.items())))

    # audit + backfill meta
    audit = bronze_completeness_audit(out_root, INGEST_DATE)
    backfilled = backfill_meta_for_csv_only(out_root, INGEST_DATE)

    # write manifest
    manifest = {
        "ingest_date": INGEST_DATE,
        "created_at": now_iso(),
        "ok_main": ok_main,
        "empty_main": empty_main,
        "errors_main": len(err_main),
        "errors_main_breakdown": {
            r: sum(1 for x in err_main if x.get("reason") == r)
            for r in set(x.get("reason", "other") for x in err_main)
        },
        "ok_retry": ok_retry,
        "empty_retry": empty_retry,
        "errors_retry": len(err_retry),
        "errors_retry_breakdown": {
            r: sum(1 for x in err_retry if x.get("reason") == r)
            for r in set(x.get("reason", "other") for x in err_retry)
        },
        "audit": audit,
        "backfilled_meta": backfilled,
        "total_duration_sec": round(time.time() - t0, 1),
    }
    write_json(out_root / "bronze" / f"manifest_ingest_date={INGEST_DATE}.json", manifest)
    print("Manifest written. Total elapsed:", fmt_secs(time.time() - t0))

    # upload to S3
    if DO_UPLOAD_TO_S3 and S3_BUCKET:
        s3_upload_bronze_ingest(
            out_root,
            INGEST_DATE,
            bucket=S3_BUCKET,
            prefix=S3_PREFIX,
            clean_first=S3_CLEAN_INGEST_DATE_FIRST,
            max_workers=S3_MAX_UPLOAD_WORKERS,
        )
        print(f"✅ Uploaded bronze ingest {INGEST_DATE} to s3://{S3_BUCKET}/{S3_PREFIX.strip('/')}/bronze/")
    else:
        print("S3 upload disabled; set DO_UPLOAD_TO_S3=True and configure S3_BUCKET/S3_PREFIX to enable.")

if __name__ == "__main__":
    main()


Dumping Bronze metadata…
Planned tasks: 1980 from 45 profiles (AT cache misses: 0)


Bronze CSV:   0%|          | 0/1980 [00:00<?, ?it/s]

Main pass → OK:243  Empties:1736  Errors:1  (in 1:08:12)
Error breakdown: http_5xx:1
Failures to retry: 1
Retrying pairs:
  - profile_id=20 key=general-practice area_type_id=7
[retry][general-practice/7] trying by_profile_id (no parent)


Retry fallbacks:   0%|          | 0/1 [00:00<?, ?it/s]

[retry][general-practice/7] no-parent failed: http_5xx 500 500 Server Error: Internal Server Error for url: https://fingertips.phe.org.uk/api/all_data/csv/by_profile_id?profile_id=20&child_area_type_id=7
[retry][general-practice/7] trying group concat (deadline 1800s)
[retry][general-practice/7] groups 1/14 (used=1, rows~1309061)
[retry][general-practice/7] groups 5/14 (used=5, rows~4459028)
[retry][general-practice/7] groups 10/14 (used=10, rows~7961697)
[retry][general-practice/7] timeout after 13/14 groups
[retry][general-practice/7] success via groups used=13 rows~9647287
Retry pass → OK:1  Empties:0  Errors:0  (in 44:00)

Running Bronze completeness audit…
Using INGEST_DATE: 2025-09-17
Expected pairs (from cache): 1980
Actual pairs with meta: 1980

=== Completeness ===
Expected pairs: 1980
Covered OK (incl. ok_no_meta): 244
Covered EMPTY: 1736
Missing: 0
Audit written to: out\bronze\audits\ingest_date=2025-09-17

Backfilling meta for CSV-only partitions (if any)…
Backfilled meta f