In [None]:
# === NHS ODS Bronze (Jupyter) — SMART (baseline-or-sync), fast, no re-downloads ===
# - SMART runner: if watermark exists -> /sync (only new/changed); else baseline
# - Baseline: dedupe across ALL past runs (won't re-download existing org_*.json)
# - Concurrency for full-record GETs + global RPS cap + retries/backoff
# - Minimal single-line tqdm bar; per-role bars hidden
# - ROLE_IDS=None -> auto-discover all roles from /roles (no 406)
# - Clean extracts of orgs/addresses/roles/rels/successors
# ------------------------------------------------------------------

import os, re, json, time, math, threading, random
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
from urllib.parse import urlparse

import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.auto import tqdm  # notebook-friendly

# ------------------- USER SETTINGS -------------------
BRONZE_ROOT = Path(r"C:\Users\NikhilYadav\Desktop\NHS ODS\bronze\ods")
ORD_BASE    = "https://directory.spineservices.nhs.uk/ORD/2-0-0"

# Speed knobs (tune carefully)
RATE_LIMIT_RPS        = 6     # global requests/sec across all threads
CONCURRENCY_FULL_ORG  = 16    # concurrent full-record GETs (8–24 is typical sweet spot)
PAGE_LIMIT            = 1000  # summary page size

# Role selection: provide list OR None to discover all from /roles
ROLE_IDS: Optional[List[str]] = None

# Fallback to Roles= when PrimaryRoleId is rejected
USE_ROLES_PARAM_IF_NEEDED = True

# Write tidy CSV/Parquet extracts at the end
MAKE_CLEAN_EXTRACTS = True

# ------------------- MINIMAL PROGRESS BAR -------------------
TQDM_BAR_OVERALL = "{desc}: {percentage:3.0f}%"
TQDM_KW = dict(bar_format=TQDM_BAR_OVERALL, dynamic_ncols=True, mininterval=1.5, smoothing=0.2, ascii=None)

# ------------------- UTILS -------------------
def now_utc_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)

def write_json(path: Path, obj: Any) -> None:
    ensure_dir(path.parent)
    with path.open("w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)

def read_json(path: Path, default=None):
    if not path.exists():
        return default
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)

# watermarks
def wm_path() -> Path: return BRONZE_ROOT / "_watermarks.json"
def get_wm() -> Dict[str, Any]: return read_json(wm_path(), default={}) or {}
def set_wm(key: str, val: Any) -> None:
    wm = get_wm(); wm[key] = val; write_json(wm_path(), wm)

# checkpoints (resume per role)
CKPT_DIR = BRONZE_ROOT / "_checkpoints"; ensure_dir(CKPT_DIR)
def load_ckpt(role_id: str) -> int:
    j = read_json(CKPT_DIR / f"{role_id}.json", {})
    return int(j.get("next_offset", 1))  # 1-based
def save_ckpt(role_id: str, next_offset: int):
    write_json(CKPT_DIR / f"{role_id}.json", {"next_offset": int(next_offset), "saved_at": now_utc_iso()})

# dedupe set across ALL runs (so re-runs won't re-download)
def preload_seen_orgs() -> set:
    seen=set()
    for p in BRONZE_ROOT.glob("release_date=*/source=ord/**/chunks/org_*.json"):
        try:
            seen.add(p.stem.replace("org_",""))
        except: pass
    return seen

# ------------------- HTTP SESSION (gzip + retries + pools) -------------------
session = requests.Session()
session.headers.update({
    "User-Agent": "ods-bronze/2.0",
    "Accept-Encoding": "gzip, deflate",
})

retry = Retry(
    total=8,
    backoff_factor=0.5,
    status_forcelist=(429, 500, 502, 503, 504),
    allowed_methods=frozenset(["GET"]),
    raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=64, pool_maxsize=64)
session.mount("https://", adapter); session.mount("http://", adapter)

# global token-bucket-ish RPS limiter (shared across threads)
_rate_lock = threading.Lock()
_last_tick = [0.0]
def rate_sleep():
    with _rate_lock:
        now = time.time()
        min_gap = 1.0 / max(RATE_LIMIT_RPS, 1)
        wait = max(0.0, min_gap - (now - _last_tick[0]))
        if wait > 0: time.sleep(wait)
        _last_tick[0] = time.time()

def ord_request(url: str, params: Dict[str, Any]) -> requests.Response:
    q = dict(params); q["_format"] = "json"  # spec: lowercase
    rate_sleep()
    return session.get(url, params=q, timeout=60, allow_redirects=True)

def ord_get_json(url: str, params: Dict[str, Any]) -> Dict[str, Any]:
    r = ord_request(url, params)
    if not (200 <= r.status_code < 300):
        raise RuntimeError(f"ORD GET failed {r.status_code}. URL: {r.url}\nBody: {r.text[:400]}")
    return r.json()

def ord_get_full_org(link: str, retries=3, backoff=1.5) -> Dict[str, Any]:
    if "_format=" not in link:
        sep = "&" if "?" in link else "?"
        link = f"{link}{sep}_format=json"
    for attempt in range(1, retries+1):
        rate_sleep()
        r = session.get(link, timeout=60, allow_redirects=True)
        if 200 <= r.status_code < 300:
            return r.json()
        if attempt == retries:
            raise RuntimeError(f"ORD org GET failed {r.status_code}. URL: {link}\nBody: {r.text[:400]}")
        time.sleep(backoff**attempt)

# ------------------- ROLE DISCOVERY -------------------
def discover_role_ids() -> List[str]:
    roles_url = f"{ORD_BASE.rstrip('/')}/roles"
    # JSON first
    try:
        payload = ord_get_json(roles_url, params={})
        ids=set()
        def walk(x):
            if isinstance(x, dict):
                v = x.get("id")
                if isinstance(v, str) and v.upper().startswith("RO"):
                    ids.add(v.upper())
                for vv in x.values(): walk(vv)
            elif isinstance(x, list):
                for it in x: walk(it)
        walk(payload)
        if ids: return sorted(ids)
    except Exception:
        pass
    # Fallback XML (tolerant)
    r = ord_request(roles_url, params={"_format":"xml"})
    r.raise_for_status()
    ids = sorted(set(re.findall(r">\s*(RO\d+)\s*<", r.text, flags=re.IGNORECASE)))
    return [i.upper() for i in ids]

# ------------------- TOTAL COUNT DISCOVERY -------------------
def get_total_for_role(search_url: str, role_id: str, use_roles_param: bool) -> Tuple[Optional[int], Dict[str, Any]]:
    # Try PrimaryRoleId first
    params = {"PrimaryRoleId": role_id, "Limit": 1, "Offset": 1}
    r = ord_request(search_url, params)
    if 200 <= r.status_code < 300:
        total = r.headers.get("X-Total-Count")
        return (int(total) if total and total.isdigit() else None, {"PrimaryRoleId": role_id})
    # Fallback to Roles
    if use_roles_param:
        params = {"Roles": role_id, "Limit": 1, "Offset": 1}
        r = ord_request(search_url, params)
        if 200 <= r.status_code < 300:
            total = r.headers.get("X-Total-Count")
            return (int(total) if total and total.isdigit() else None, {"Roles": role_id})
        raise RuntimeError(f"Unable to get total for {role_id}. URL: {r.url}\nBody: {r.text[:200]}")
    raise RuntimeError(f"Unable to get total for {role_id}. URL: {r.url}\nBody: {r.text[:200]}")

# ------------------- BASELINE (fast, deduped) -------------------
def baseline_roles_with_progress(role_ids: List[str]) -> Path:
    ensure_dir(BRONZE_ROOT)
    release_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    search_url   = ORD_BASE.rstrip("/") + "/organisations"

    base_dir  = BRONZE_ROOT / f"release_date={release_date}" / "source=ord" / "release_type=api_baseline" / "dataset=roles"
    chunk_dir = base_dir / "chunks"; ensure_dir(chunk_dir)

    # plan totals (we'll skip roles with 0 current orgs)
    role_plans = []
    grand_total = 0; totals_known = True
    for rid in role_ids:
        total, base_params = get_total_for_role(search_url, rid, USE_ROLES_PARAM_IF_NEEDED)
        role_plans.append({"role": rid, "total": total, "base_params": base_params})
    role_plans = [p for p in role_plans if (p["total"] or 0) > 0 or p["total"] is None]
    for p in role_plans:
        if p["total"] is None: totals_known = False
        else: grand_total += p["total"]

    manifest = {
        "api": ORD_BASE, "release_date": release_date, "release_type": "api_baseline",
        "downloaded_at_utc": now_utc_iso(), "roles": [p["role"] for p in role_plans],
        "role_plans": role_plans, "summary_chunks": [], "org_records": []
    }

    overall = tqdm(total=grand_total if totals_known else None, unit="org",
                   desc=f"SELECTED ROLES ({len(role_plans)})", leave=True, **TQDM_KW)

    SEEN_ORGS = preload_seen_orgs()

    def page_once(params: Dict[str, Any], offset: int, limit: int) -> Dict[str, Any]:
        p = dict(params); p["Limit"] = limit; p["Offset"] = max(1, offset)
        return ord_get_json(search_url, p)

    def fetch_and_write(org_rec):
        link = org_rec.get("OrgLink"); oid = org_rec.get("OrgId")
        if not link or not oid: return None, False
        if oid in SEEN_ORGS:    return oid, False
        full = ord_get_full_org(link)
        ofile = f"org_{oid}.json"; write_json(chunk_dir / ofile, full)
        SEEN_ORGS.add(oid)
        return oid, True

    for plan in role_plans:
        rid = plan["role"]; total = plan["total"]; base_params = plan["base_params"]
        tqdm.write(f"→ Crawling {rid} ({total if total is not None else 'unknown'} orgs)")
        offset = load_ckpt(rid)

        while True:
            try:
                data = page_once(base_params, offset, PAGE_LIMIT)
            except RuntimeError:
                if "PrimaryRoleId" in base_params and USE_ROLES_PARAM_IF_NEEDED:
                    base_params = {"Roles": rid}
                    data = page_once(base_params, offset, PAGE_LIMIT)
                else:
                    overall.close(); raise

            orgs = data.get("Organisations", []) or []
            if not orgs:
                save_ckpt(rid, 1)
                break

            # save summary page
            chunk_name = f"search_{rid}_{offset:09d}.json"
            write_json(chunk_dir / chunk_name, data)
            manifest["summary_chunks"].append({"role": rid, "file": chunk_name, "count": len(orgs)})

            # concurrent full-record fetches (only for unseen orgs)
            with ThreadPoolExecutor(max_workers=CONCURRENCY_FULL_ORG) as ex:
                futures = [ex.submit(fetch_and_write, rec) for rec in orgs]
                for fut in as_completed(futures):
                    try:
                        fut.result()
                    except Exception as e:
                        manifest.setdefault("errors", []).append({"role": rid, "error": str(e)})
                    finally:
                        overall.update(1)

            offset += PAGE_LIMIT; save_ckpt(rid, offset)
            if len(orgs) < PAGE_LIMIT:
                save_ckpt(rid, 1)
                break

    write_json(base_dir / "_manifest.json", manifest)
    set_wm("ord_api_baseline_date", release_date)
    set_wm("ord_last_change_date", (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d"))

    # final count (deduped)
    org_files = list((base_dir / "chunks").glob("org_*.json"))
    print(f"[OK] Baseline complete → {base_dir}")
    print("Org files (newly written in this run):", len(org_files))
    return base_dir

# ------------------- INCREMENTAL SYNC (preferred for re-runs) -------------------
def incremental_sync_with_progress():
    wm = get_wm()
    since = wm.get("ord_last_change_date")
    assert since, "No watermark found. Run baseline first."

    sync_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    sync_url  = ORD_BASE.rstrip("/") + "/sync"

    base_dir  = BRONZE_ROOT / f"release_date={sync_date}" / "source=ord" / "release_type=api_sync" / "dataset=all"
    chunk_dir = base_dir / "chunks"; ensure_dir(chunk_dir)

    data = ord_get_json(sync_url, {"LastChangeDate": since})
    write_json(chunk_dir / f"sync_list_since_{since}.json", data)

    changed = data.get("Organisations", []) or []
    bar = tqdm(total=len(changed), unit="org", desc="SYNC", leave=True, **TQDM_KW)

    SEEN_ORGS = preload_seen_orgs()
    new_count = 0
    for o in changed:
        link = o.get("OrgLink")
        if not link:
            bar.update(1); continue
        full = ord_get_full_org(link)
        oid  = full.get("OrgId") or urlparse(link).path.rstrip("/").split("/")[-1].split("?",1)[0]
        if oid in SEEN_ORGS:
            # Still write into current run to make this run self-contained for the changed subset
            pass
        write_json(chunk_dir / f"org_{oid}.json", full)
        new_count += (0 if oid in SEEN_ORGS else 1)
        bar.update(1)
    bar.close()

    write_json(base_dir / "_manifest.json", {
        "api": ORD_BASE, "release_date": sync_date, "release_type": "api_sync",
        "downloaded_at_utc": now_utc_iso(), "params":{"LastChangeDate": since},
        "changed_count": len(changed), "new_orgs_first_seen_this_run": new_count
    })
    set_wm("ord_last_change_date", datetime.now(timezone.utc).strftime("%Y-%m-%d"))
    print(f"[OK] Sync complete → {base_dir} (changed orgs: {len(changed)}, new orgs: {new_count})")
    return base_dir

# ------------------- CLEAN EXTRACTS -------------------
def scalar(x):
    if x is None or isinstance(x, (str, int, float, bool)): return x
    if isinstance(x, dict):
        for k in ("extension","value","_","text","#text","displayName","code","id"):
            if k in x and isinstance(x[k], (str, int, float, bool)): return x[k]
        if len(x)==1:
            v = next(iter(x.values()))
            if isinstance(v,(str,int,float,bool)): return v
    if isinstance(x, list):
        for it in x:
            s = scalar(it)
            if s is not None: return s
    try: return json.dumps(x, ensure_ascii=False, separators=(",",":"))
    except: return str(x)

def pick_org(j):
    if isinstance(j, dict) and "Organisation" in j and isinstance(j["Organisation"], dict): return j["Organisation"]
    if isinstance(j, dict): return j
    return {}

def get_org_id(org):
    raw = org.get("OrgId")
    if isinstance(raw, dict) and "extension" in raw: return str(raw["extension"])
    return str(scalar(raw) or "")

def get_record_class(org):
    rc = org.get("orgRecordClass") or org.get("OrgRecordClass")
    rc = str(scalar(rc) or "")
    label = {"RC1":"HSCOrg", "RC2":"HSCSite"}.get(rc.upper(), "")
    return rc, label

def ensure_str(x): return "" if x is None else str(x)

def deep_find_first_key(obj, key_regex):
    pat = re.compile(key_regex, re.IGNORECASE)
    stack = [obj]; seen=set()
    while stack:
        cur = stack.pop()
        if id(cur) in seen: continue
        seen.add(id(cur))
        if isinstance(cur, dict):
            for k,v in cur.items():
                if isinstance(k, str) and pat.search(k):
                    val = scalar(v)
                    if val not in (None,"","null"): return val
            for v in cur.values():
                if isinstance(v,(dict,list)): stack.append(v)
        elif isinstance(cur, list):
            for it in cur:
                if isinstance(it,(dict,list)): stack.append(it)
    return None

def clean_postcode(pc):
    if not pc: return "", ""
    s = re.sub(r"\s+","", str(pc)).upper()
    spaced = s[:-3] + " " + s[-3:] if len(s)>3 else s
    return s, spaced

def extract_address_fields(org):
    candidates=[]
    for path in [("GeoLoc","Location"), ("PostalAddress",), ("Address",), ("Contact","Address")]:
        cur = org; ok=True
        for k in path:
            if isinstance(cur, dict) and k in cur: cur = cur[k]
            else: ok=False; break
        if ok and isinstance(cur, dict): candidates.append(cur)
    addr={}
    for cand in candidates:
        keys = {k.lower() for k in cand.keys()}
        if keys & {"addrln1","addrln2","addrln3","addrln4","addrl1","addrl2","town","city","county","postcode"}: addr=cand; break
        if any(k.lower()=="postcode" for k in cand.keys()): addr=cand; break
    def pick(*names):
        for n in names:
            if isinstance(addr, dict) and n in addr: return scalar(addr[n])
        return None
    line1 = pick("AddrLn1","Addrl1","Address1")
    line2 = pick("AddrLn2","Addrl2","Address2")
    line3 = pick("AddrLn3","Addrl3","Address3")
    line4 = pick("AddrLn4","Addrl4","Address4")
    city  = pick("Town","City","Locality")
    county= pick("County")
    country = pick("Country")
    pc = pick("PostCode","Postcode","Post_Code","Post Code") or deep_find_first_key(org, r"post\s*code|postcode")
    parts = [line1,line2,line3,line4,city,county,country]
    addr_full = ", ".join([ensure_str(x).strip() for x in parts if x and str(x).strip()!=""])
    pc_compact, pc_spaced = clean_postcode(pc)
    return {
        "AddrLine1": ensure_str(line1), "AddrLine2": ensure_str(line2),
        "AddrLine3": ensure_str(line3), "AddrLine4": ensure_str(line4),
        "TownCity": ensure_str(city), "County": ensure_str(county), "Country": ensure_str(country),
        "PostCode": pc_compact, "PostCodeSpaced": pc_spaced, "AddressFull": addr_full
    }

def extract_dates(org):
    out=[]; d = org.get("Date")
    if isinstance(d, list):
        for item in d:
            if isinstance(item, dict):
                out.append({"DateType": ensure_str(item.get("Type")), "Start": ensure_str(item.get("Start")), "End": ensure_str(item.get("End"))})
    return out

def extract_roles(org):
    out=[]; R = org.get("Roles")
    def emit(it):
        if not isinstance(it, dict): return
        rid = scalar(it.get("id") or it.get("idCode") or it.get("Id") or it.get("code"))
        primary = bool(it.get("primaryRole", False))
        rstat = ensure_str(scalar(it.get("Status")))
        dates=[]; d = it.get("Date")
        if isinstance(d, list):
            for di in d:
                if isinstance(di, dict):
                    dates.append({"Type": ensure_str(di.get("Type")), "Start": ensure_str(di.get("Start")), "End": ensure_str(di.get("End"))})
        out.append({"RoleId": ensure_str(rid), "PrimaryRole": primary, "RoleStatus": rstat, "RoleDates": dates})
    if isinstance(R, list):
        for it in R: emit(it)
    elif isinstance(R, dict):
        rl = R.get("Role")
        if isinstance(rl, list):
            for it in rl: emit(it)
        elif isinstance(rl, dict):
            emit(rl)
    return out

def extract_rels(org):
    out=[]; R = org.get("Rels")
    if not isinstance(R, dict): return out
    rl = R.get("Rel")
    items = rl if isinstance(rl, list) else ([rl] if isinstance(rl, dict) else [])
    for it in items:
        if not isinstance(it, dict): continue
        rel_id = ensure_str(scalar(it.get("id"))); rstat  = ensure_str(scalar(it.get("Status")))
        target = it.get("Target") or {}
        tgt_org_raw = target.get("OrgId")
        tgt_org = ensure_str(tgt_org_raw.get("extension") if isinstance(tgt_org_raw, dict) and "extension" in tgt_org_raw else scalar(tgt_org_raw))
        tgt_role_raw = target.get("PrimaryRoleId") or {}
        tgt_role = ensure_str((tgt_role_raw.get("id") if isinstance(tgt_role_raw, dict) else tgt_role_raw) or "")
        dates=[]; d = it.get("Date")
        if isinstance(d, list):
            for di in d:
                if isinstance(di, dict):
                    dates.append({"Type": ensure_str(di.get("Type")), "Start": ensure_str(di.get("Start")), "End": ensure_str(di.get("End"))})
        out.append({"RelId": rel_id, "RelStatus": rstat, "TargetOrgId": tgt_org, "TargetPrimaryRoleId": tgt_role, "RelDates": dates})
    return out

def extract_succs(org):
    out=[]; S = org.get("Succs")
    if not isinstance(S, dict): return out
    sc = S.get("Succ")
    items = sc if isinstance(sc, list) else ([sc] if isinstance(sc, dict) else [])
    for it in items:
        if not isinstance(it, dict): continue
        typ = ensure_str(scalar(it.get("Type"))); target = it.get("Target") or {}
        tgt_org_raw = target.get("OrgId")
        tgt_org = ensure_str(tgt_org_raw.get("extension") if isinstance(tgt_org_raw, dict) and "extension" in tgt_org_raw else scalar(tgt_org_raw))
        tgt_role_raw = target.get("PrimaryRoleId") or {}
        tgt_role = ensure_str((tgt_role_raw.get("id") if isinstance(tgt_role_raw, dict) else tgt_role_raw) or "")
        dates=[]; d = it.get("Date")
        if isinstance(d, list):
            for di in d:
                if isinstance(di, dict):
                    dates.append({"Type": ensure_str(di.get("Type")), "Start": ensure_str(di.get("Start")), "End": ensure_str(di.get("End"))})
        out.append({"SuccType": typ, "TargetOrgId": tgt_org, "TargetPrimaryRoleId": tgt_role, "SuccDates": dates})
    return out

def make_clean_extracts():
    # locate newest chunks from either baseline or sync
    chunks_dirs = sorted(BRONZE_ROOT.glob("release_date=*/source=ord/*/dataset=*/chunks"))
    assert chunks_dirs, "No chunks folder found."
    latest_chunks = chunks_dirs[-1]

    org_rows, date_rows, role_rows, rel_rows, succ_rows = [], [], [], [], []
    files = list(latest_chunks.glob("org_*.json"))

    for fp in files:
        j = read_json(fp, {})
        org = pick_org(j)
        oid = get_org_id(org)
        name = ensure_str(scalar(org.get("Name")))
        status = ensure_str(scalar(org.get("Status")))
        lcd = ensure_str(scalar(org.get("LastChangeDate")))
        rc_code, rc_label = get_record_class(org)
        addr = extract_address_fields(org)

        org_rows.append({
            "OrgId": oid,
            "Name": name,
            "Status": status,
            "IsActive": (status.strip().lower()=="active"),
            "OrgRecordClass": rc_code,
            "OrgRecordClassLabel": rc_label,
            "LastChangeDate": lcd,
            **addr
        })

        for d in extract_dates(org):
            date_rows.append({"OrgId": oid, **d})

        for r in extract_roles(org):
            if r["RoleDates"]:
                for rd in r["RoleDates"]:
                    role_rows.append({
                        "OrgId": oid,
                        "RoleId": r["RoleId"],
                        "PrimaryRole": r["PrimaryRole"],
                        "RoleStatus": r["RoleStatus"],
                        "RoleDateType": rd["Type"],
                        "RoleStart": rd["Start"],
                        "RoleEnd": rd["End"],
                    })
            else:
                role_rows.append({
                    "OrgId": oid,
                    "RoleId": r["RoleId"],
                    "PrimaryRole": r["PrimaryRole"],
                    "RoleStatus": r["RoleStatus"],
                    "RoleDateType": "",
                    "RoleStart": "",
                    "RoleEnd": "",
                })

        for rel in extract_rels(org):
            if rel["RelDates"]:
                for rd in rel["RelDates"]:
                    rel_rows.append({
                        "OrgId": oid,
                        "RelId": rel["RelId"],
                        "RelStatus": rel["RelStatus"],
                        "TargetOrgId": rel["TargetOrgId"],
                        "TargetPrimaryRoleId": rel["TargetPrimaryRoleId"],
                        "RelDateType": rd["Type"],
                        "RelStart": rd["Start"],
                        "RelEnd": rd["End"],
                    })
            else:
                rel_rows.append({
                    "OrgId": oid,
                    "RelId": rel["RelId"],
                    "RelStatus": rel["RelStatus"],
                    "TargetOrgId": rel["TargetOrgId"],
                    "TargetPrimaryRoleId": rel["TargetPrimaryRoleId"],
                    "RelDateType": "",
                    "RelStart": "",
                    "RelEnd": "",
                })

        for sc in extract_succs(org):
            if sc["SuccDates"]:
                for sd in sc["SuccDates"]:
                    succ_rows.append({
                        "OrgId": oid,
                        "SuccType": sc["SuccType"],
                        "TargetOrgId": sc["TargetOrgId"],
                        "TargetPrimaryRoleId": sc["TargetPrimaryRoleId"],
                        "SuccDateType": sd["Type"],
                        "SuccStart": sd["Start"],
                        "SuccEnd": sd["End"],
                    })
            else:
                succ_rows.append({
                    "OrgId": oid,
                    "SuccType": sc["SuccType"],
                    "TargetOrgId": sc["TargetOrgId"],
                    "TargetPrimaryRoleId": sc["TargetPrimaryRoleId"],
                    "SuccDateType": "",
                    "SuccStart": "",
                    "SuccEnd": "",
                })

    # ---- safe builders (handle empty tables + missing cols) ----
    def build_df(rows, required_cols, sort_cols):
        df = pd.DataFrame(rows)
        if df.empty:
            df = pd.DataFrame(columns=required_cols)
        else:
            for c in required_cols:
                if c not in df.columns:
                    df[c] = ""
        sort_by = [c for c in sort_cols if c in df.columns]
        if sort_by:
            df = df.sort_values(sort_by)
        return df.reset_index(drop=True)

    orgs_df  = build_df(
        org_rows,
        required_cols=["OrgId","Name","Status","IsActive","OrgRecordClass","OrgRecordClassLabel",
                       "LastChangeDate","AddrLine1","AddrLine2","AddrLine3","AddrLine4",
                       "TownCity","County","Country","PostCode","PostCodeSpaced","AddressFull"],
        sort_cols=["OrgId"]
    ).drop_duplicates(subset=["OrgId"])

    dates_df = build_df(
        date_rows,
        required_cols=["OrgId","DateType","Start","End"],
        sort_cols=["OrgId","DateType","Start"]
    )

    roles_df = build_df(
        role_rows,
        required_cols=["OrgId","RoleId","PrimaryRole","RoleStatus","RoleDateType","RoleStart","RoleEnd"],
        sort_cols=["OrgId","RoleId","RoleStart"]
    )

    rels_df = build_df(
        rel_rows,
        required_cols=["OrgId","RelId","RelStatus","TargetOrgId","TargetPrimaryRoleId",
                       "RelDateType","RelStart","RelEnd"],
        sort_cols=["OrgId","RelId","RelStart"]
    )

    succ_df = build_df(
        succ_rows,
        required_cols=["OrgId","SuccType","TargetOrgId","TargetPrimaryRoleId","SuccDateType","SuccStart","SuccEnd"],
        sort_cols=["OrgId","SuccType","SuccStart"]
    )

    # ---- write extracts ----
    extracts = BRONZE_ROOT / "extracts"
    ensure_dir(extracts)
    ts = datetime.now().strftime("%Y-%m-%d")
    orgs_df.to_csv(extracts / f"orgs_{ts}.csv", index=False)
    orgs_df.to_parquet(extracts / f"orgs_{ts}.parquet", index=False)
    dates_df.to_csv(extracts / f"org_dates_{ts}.csv", index=False)
    roles_df.to_csv(extracts / f"org_roles_{ts}.csv", index=False)
    rels_df.to_csv(extracts / f"org_rels_{ts}.csv", index=False)
    succ_df.to_csv(extracts / f"org_succs_{ts}.csv", index=False)

    print("Wrote tidy extracts to:", extracts)
    display(orgs_df.head(10))
    return orgs_df, roles_df, rels_df, succ_df


# ------------------- SMART RUNNER -------------------
def smart_run():
    """Prefer /sync if watermark exists; else baseline. Never re-download existing orgs."""
    ensure_dir(BRONZE_ROOT)
    print("Bronze root:", BRONZE_ROOT.resolve())

    # If ROLE_IDS is None, discover from /roles
    global ROLE_IDS
    if ROLE_IDS is None:
        print("Discovering all Role IDs from /roles ...")
        ROLE_IDS = discover_role_ids()
        print(f"Discovered {len(ROLE_IDS)} roles")

    wm = get_wm()
    if wm.get("ord_last_change_date"):
        print("Watermark found → running INCREMENTAL SYNC (new/changed only)")
        sync_dir = incremental_sync_with_progress()
        return sync_dir
    else:
        print("No watermark → running BASELINE (deduped; skips already-downloaded orgs)")
        base_dir = baseline_roles_with_progress(ROLE_IDS)
        return base_dir

# ------------------- RUN -------------------
out_dir = smart_run()

if MAKE_CLEAN_EXTRACTS:
    orgs_df, roles_df, rels_df, succ_df = make_clean_extracts()
