In [11]:
# NHS GP Registrations – complete downloader/organiser (CSV-only)
# - Gets ALL datasets including LSOA-by-sex (male/female)
# - Saves into by_dataset/<dataset>/period=YYYY-MM/*.csv
# - Optional: split practice datasets into ICB-of-practice subfiles (per-month using that month’s practice_map)
# - Optional: split LSOA-by-sex by ICB-of-residence if you provide an ONS LSOA->ICB lookup CSV
#
# Usage:
#   pip install requests beautifulsoup4
#   # (pandas only needed if you set SPLIT_BY_ICB=True or provide LSOA_TO_ICB_RESIDENCE_LOOKUP)
#   python nhs_gp_downloader.py

from __future__ import annotations
import re
import time
import json
from pathlib import Path
from urllib.parse import urljoin, urlparse
from collections import defaultdict, deque
from io import BytesIO
from zipfile import ZipFile

import requests
from bs4 import BeautifulSoup

# ---- Config ---------------------------------------------------------------

START_URL = "https://digital.nhs.uk/data-and-information/publications/statistical/patients-registered-at-a-gp-practice"
OUT_DIR   = Path("./nhs_gp_data_v2")
HEADERS   = {"User-Agent": "nhs-gp-registrations-downloader/2.2 (+for-analysis)"}

# Keep only the last N years (inclusive of the current year).
# Set to None for ALL releases.
YEARS_BACK = 5

# Optional: split practice-level CSVs by ICB-of-practice using that month’s practice_map
SPLIT_BY_ICB = False

# Optional: if you obtain an ONS LSOA->ICB-of-residence lookup CSV, put its path here
# and the script will also make a by_icb_residence split for LSOA-by-sex.
LSOA_TO_ICB_RESIDENCE_LOOKUP = None  # e.g. Path("ons_lsoa11_to_icb22_lookup.csv")

# ---- Helpers --------------------------------------------------------------

# Month name handling (short and long)
MONTH_RE = r"(jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|aug(?:ust)?|sep(?:t|tember)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)"
MONTH_3 = {"jan":1,"feb":2,"mar":3,"apr":4,"may":5,"jun":6,"jul":7,"aug":8,"sep":9,"oct":10,"nov":11,"dec":12}
def _mon_to_num(m: str) -> int:
    m = m.lower()
    if m.startswith("sept"):  # normalize 'sept' -> 'sep'
        m = "sep"
    return MONTH_3[m[:3]]

ASSET_EXTS = {".csv",".zip",".xls",".xlsx",".parquet"}  # we’ll only keep CSVs; ZIPs are extracted

def _is_html(resp) -> bool:
    return "text/html" in resp.headers.get("Content-Type","").lower()

def _sanitize(name: str) -> str:
    return re.sub(r"[^a-zA-Z0-9._\-+]", "_", name)

def _classify_dataset(url_or_name: str) -> str | None:
    s = url_or_name.lower()

    # LSOA by sex
    if "lsoa" in s and ("male-female" in s or "male_female" in s or "malefemale" in s or "male female" in s):
        return "lsoa_by_sex"

    # Practice map (practice -> ICB/PCN info)
    if "prac-map" in s or "practice_map" in s or re.search(r"\bmap(?:[_\-]?(v2)?)?\b", s):
        return "practice_map"

    # PCN datasets
    if "pat-pcn" in s or re.search(r"\bpcn\b", s):
        return "pcn"

    # Practice totals + age splits
    if "quin-age" in s or "quinary" in s:
        return "practice_quinary_age"

    if "sing-age" in s:
        # guard against 'female' containing 'male'
        if re.search(r"(?:^|[^a-z])female(?:[^a-z]|$)", s):
            return "practice_single_age_female"
        if re.search(r"(?:^|[^a-z])male(?:[^a-z]|$)", s):
            return "practice_single_age_male"

    if "sing-age" in s and "regions" in s:
        return "single_age_regions"

    if re.search(r"prac[-_]?all(\b|[^a-z])", s) or re.search(r"\ball[-_]?jun[-_]?18\b", s):
        return "practice_all"

    # Misc catch-all
    if "patients-registered" in s or "gp-reg" in s or "prac" in s:
        return "misc"
    return None

def _infer_period(asset_url: str, source_urls: list[str]) -> tuple[int|None,int|None]:
    """
    Infer YYYY-MM from either the asset filename or the source page URLs.
    Handles:
      - ...-oct-2025.csv / ...-october-2025.zip  (full or short month)
      - ...-oct-25.csv
      - .../2025/10/... or .../2025-10/...
      - .../2025/aug/..., .../august-2025/..., .../2025-aug/...
    """
    name = Path(urlparse(asset_url).path).name.lower()

    # month name + 4-digit year in filename
    m = re.search(rf"{MONTH_RE}[-_ ]?(20\d{{2}})", name, flags=re.IGNORECASE)
    if m:
        return (int(m.group(2)), _mon_to_num(m.group(1)))

    # month name + 2-digit year in filename
    m = re.search(rf"{MONTH_RE}[-_ ]?(?:20)?(\d{{2}})\b", name, flags=re.IGNORECASE)
    if m:
        return (2000 + int(m.group(2)), _mon_to_num(m.group(1)))

    # 4-digit year + numeric month anywhere in URL
    m = re.search(r"(20\d{2})[-_/](\d{1,2})(?:\D|$)", asset_url)
    if m:
        year = int(m.group(1)); mon = int(m.group(2))
        if 1 <= mon <= 12: return (year, mon)

    # Look in source page URLs
    for u in source_urls:
        p = urlparse(u).path.lower()

        # /YYYY/MM/ or /YYYY-MM/
        m = re.search(r"/(20\d{2})[-/](\d{1,2})(?:/|$)", p)
        if m:
            y, mon = int(m.group(1)), int(m.group(2))
            if 1 <= mon <= 12: return (y, mon)

        # /YYYY/<monthname>/
        m = re.search(rf"/(20\d{{2}})/{MONTH_RE}(?:/|$)", p)
        if m:
            return (int(m.group(1)), _mon_to_num(m.group(2)))

        # /<monthname>-YYYY/  or /YYYY-<monthname>/
        m = re.search(rf"/{MONTH_RE}[-_ ]?(20\d{{2}})(?:/|$)", p)
        if m:
            return (int(m.group(2)), _mon_to_num(m.group(1)))

        m = re.search(rf"/(20\d{{2}})[-_ ]?{MONTH_RE}(?:/|$)", p)
        if m:
            return (int(m.group(1)), _mon_to_num(m.group(2)))

    # Unknown
    return (None, None)

def _year_ok(year: int|None, years_back: int|None) -> bool:
    if years_back is None:
        return True
    if year is None:
        # If the user asked for last N years, drop undated assets.
        return False
    now_year = time.gmtime().tm_year
    earliest = now_year - years_back + 1
    return year >= earliest

def _save_bytes(fp: Path, content: bytes):
    fp.parent.mkdir(parents=True, exist_ok=True)
    with open(fp, "wb") as f:
        f.write(content)

def _extract_zip_to(fp: Path, zip_bytes: bytes):
    with ZipFile(BytesIO(zip_bytes)) as z:
        for zi in z.infolist():
            name = Path(zi.filename)
            if not name.name or name.name.startswith("."):
                continue
            if name.suffix.lower() != ".csv":
                continue  # keep only CSVs
            out = fp / _sanitize(name.name)
            out.parent.mkdir(parents=True, exist_ok=True)
            with z.open(zi) as src, open(out, "wb") as dst:
                dst.write(src.read())

# ---- Crawl to collect assets with referers --------------------------------

def crawl_assets(start_url: str = START_URL):
    """
    Crawl all pages under the publication and collect links to assets on any digital.nhs.uk domain.
    - Accept asset links (csv/zip/xlsx/etc.) regardless of path (e.g., files.digital.nhs.uk).
    - Only restrict by path for *page crawling*.
    """
    sess = requests.Session()
    sess.headers.update(HEADERS)
    to_visit = deque([start_url])
    visited = set()
    referers = defaultdict(set)  # asset_url -> set(page_urls_that_link_to_it)

    while to_visit:
        url = to_visit.popleft()
        if url in visited:
            continue
        visited.add(url)

        try:
            r = sess.get(url, timeout=30)
            if not _is_html(r):
                continue
        except requests.RequestException:
            continue

        soup = BeautifulSoup(r.text, "html.parser")
        for a in soup.select("a[href]"):
            href = a.get("href", "").strip()
            if not href:
                continue
            absu = urljoin(url, href)
            p = urlparse(absu)

            # Only work with digital.nhs.uk domains at all
            if not p.netloc.endswith("digital.nhs.uk"):
                continue

            ext = Path(p.path).suffix.lower()

            # 1) If it's an asset -> record it (no path restriction)
            if ext in ASSET_EXTS:
                referers[absu].add(url)
                continue

            # 2) Otherwise only *crawl* pages under the publication path
            if p.path.startswith("/data-and-information/publications/statistical/patients-registered-at-a-gp-practice"):
                if absu not in visited:
                    to_visit.append(absu)

    assets = [{"url": u, "sources": sorted(list(srcs))} for u, srcs in referers.items()]
    return assets

# ---- Download + organise ---------------------------------------------------

def download_and_organise(
    out_dir: Path = OUT_DIR,
    years_back: int | None = YEARS_BACK,
    split_by_icb: bool = SPLIT_BY_ICB,
    lsoa_to_icb_residence_lookup: Path | None = LSOA_TO_ICB_RESIDENCE_LOOKUP,
):
    out_raw = out_dir / "raw"
    out_by  = out_dir / "by_dataset"
    out_tmp = out_dir / "_tmp"
    for p in (out_raw, out_by, out_tmp):
        p.mkdir(parents=True, exist_ok=True)

    sess = requests.Session()
    sess.headers.update(HEADERS)

    assets = crawl_assets(START_URL)
    print(f"Found {len(assets)} asset URLs.")

    kept = []
    for item in assets:
        u = item["url"]
        dataset = _classify_dataset(u)
        year, month = _infer_period(u, item["sources"])
        if not _year_ok(year, years_back):
            continue
        if dataset is None:
            continue
        kept.append((u, dataset, year, month, item["sources"]))

    print(f"Keeping {len(kept)} assets after filtering by years/dataset.")

    manifest = []
    for (u, dataset, year, month, sources) in sorted(kept, key=lambda x: (x[2] or 0, x[3] or 0, x[0])):
        try:
            r = sess.get(u, stream=True, timeout=180)
            r.raise_for_status()
            ext = Path(urlparse(u).path).suffix.lower()
            period = f"{year:04d}-{month:02d}" if (year and month) else "unknown"
            dataset_dir = out_by / dataset / f"period={period}"
            dataset_dir.mkdir(parents=True, exist_ok=True)

            leaf = _sanitize(Path(urlparse(u).path).name or "download")
            base = f"{dataset}__{leaf}"

            if ext == ".zip":
                _extract_zip_to(dataset_dir, r.content)
                saved = f"{dataset_dir}/(zip-extracted)"
                extracted = True
            else:
                target = dataset_dir / base
                _save_bytes(target, r.content)
                saved = str(target)
                extracted = False

            manifest.append({
                "url": u,
                "dataset": dataset,
                "period": period,
                "saved_as": saved,
                "extracted": extracted,
                "sources": sources
            })
            print(f"✓ {dataset:25s}  {period:10s}  <- {leaf}")

        except Exception as e:
            print(f"× FAILED {u} :: {e}")
            manifest.append({"url": u, "dataset": dataset, "period": None, "error": str(e), "sources": sources})

    (out_dir / "manifest.json").write_text(json.dumps(manifest, indent=2), encoding="utf-8")

    # Optional: split practice datasets by ICB-of-practice
    if split_by_icb:
        try:
            import pandas as pd
        except Exception:
            print("Pandas not available; skipping split_by_icb.")
            return

        def load_practice_map(period_dir: Path):
            maps = list((period_dir/"practice_map").glob("*.csv")) + list((out_by/"practice_map"/period_dir.name).glob("*.csv"))
            if not maps:
                return None
            return pd.read_csv(maps[0], dtype=str, low_memory=False)

        practice_sets = [
            "practice_all",
            "practice_quinary_age",
            "practice_single_age_male",
            "practice_single_age_female"
        ]

        periods = sorted({p.name for p in (out_by/"practice_map").glob("period=*")})
        for pdir in periods:
            map_df = load_practice_map(out_by/pdir)
            if map_df is None:
                print(f"(!) No practice_map found for {pdir}; skipping split.")
                continue

            cols = {c.lower(): c for c in map_df.columns}
            prac_col = cols.get("practice code") or cols.get("practice_code") or cols.get("gp practice code") or cols.get("practicecode")
            icb_col  = cols.get("sub icb location code") or cols.get("icb code") or cols.get("sub_icb_location_code") or cols.get("icb_of_practice_code")
            if not (prac_col and icb_col):
                print(f"(!) Could not find practice/ICB columns in map for {pdir}. Columns = {list(map_df.columns)[:8]}...")
                continue

            map_small = map_df[[prac_col, icb_col]].dropna().drop_duplicates()
            map_small.columns = ["practice_code", "icb_code"]

            for ds in practice_sets:
                ds_dir = out_by/ds/pdir
                if not ds_dir.exists():
                    continue
                out_icb = out_by/(ds + "_by_icb")/pdir
                out_icb.mkdir(parents=True, exist_ok=True)

                for csv_fp in ds_dir.glob("*.csv"):
                    try:
                        df = pd.read_csv(csv_fp, dtype=str, low_memory=False)
                        cols = {c.lower(): c for c in df.columns}
                        d_prac = cols.get("practice code") or cols.get("practice_code") or cols.get("gp practice code") or cols.get("practicecode")
                        if not d_prac:
                            print(f"  (?) No practice code column in {csv_fp.name}; skipping.")
                            continue
                        merged = df.merge(map_small, left_on=d_prac, right_on="practice_code", how="left")
                        for icb, g in merged.groupby("icb_code", dropna=False):
                            icb_key = (icb or "UNKNOWN").replace(" ", "_")
                            g.drop(columns=["practice_code"], errors="ignore").to_csv(out_icb/f"{csv_fp.stem}__icb={icb_key}.csv", index=False)
                        print(f"  ✓ split {csv_fp.name} -> {out_icb}")
                    except Exception as e:
                        print(f"  × split failed for {csv_fp.name}: {e}")

    # Optional: split LSOA-by-sex by ICB-of-residence using provided ONS lookup
    if lsoa_to_icb_residence_lookup and (out_by/"lsoa_by_sex").exists():
        try:
            import pandas as pd
            lkp = pd.read_csv(lsoa_to_icb_residence_lookup, dtype=str)
            lkp_cols = {c.lower(): c for c in lkp.columns}
            lsoa_col = lkp_cols.get("lsoa11cd") or lkp_cols.get("lsoa code") or lkp_cols.get("lsoa_code")
            icb_col  = lkp_cols.get("icb22cd") or lkp_cols.get("icb code") or lkp_cols.get("icb_code")
            if not (lsoa_col and icb_col):
                print("(!) LSOA->ICB lookup missing expected columns; expected LSOA11CD + ICB22CD (or similar). Skipping ICB-of-residence split.")
            else:
                lkp_small = lkp[[lsoa_col, icb_col]].dropna().drop_duplicates()
                lkp_small.columns = ["LSOA_CODE", "ICB_RES_CODE"]

                for pdir in sorted({p.name for p in (out_by/"lsoa_by_sex").glob("period=*")}):
                    in_dir = out_by/"lsoa_by_sex"/pdir
                    out_icb = out_by/"lsoa_by_sex_by_icb_residence"/pdir
                    out_icb.mkdir(parents=True, exist_ok=True)

                    for csv_fp in in_dir.glob("*.csv"):
                        try:
                            df = pd.read_csv(csv_fp, dtype=str, low_memory=False)
                            cols = {c.lower(): c for c in df.columns}
                            d_lsoa = cols.get("lsoa code") or cols.get("lsoa_code") or cols.get("lsoa11cd") or "LSOA_CODE"
                            if d_lsoa not in df.columns:
                                for k, v in cols.items():
                                    if "lsoa" in k and "code" in k:
                                        d_lsoa = v; break
                            if d_lsoa not in df.columns:
                                print(f"  (?) No LSOA code column in {csv_fp.name}; skipping.")
                                continue

                            merged = df.merge(lkp_small, left_on=d_lsoa, right_on="LSOA_CODE", how="left")
                            for icb, g in merged.groupby("ICB_RES_CODE", dropna=False):
                                icb_key = (icb or "UNKNOWN").replace(" ", "_")
                                g.drop(columns=["LSOA_CODE"], errors="ignore").to_csv(out_icb/f"{csv_fp.stem}__icb_res={icb_key}.csv", index=False)
                            print(f"  ✓ LSOA->ICB-res split {csv_fp.name} -> {out_icb}")
                        except Exception as e:
                            print(f"  × LSOA-res split failed for {csv_fp.name}: {e}")
        except Exception as e:
            print(f"Could not split LSOA by ICB-of-residence: {e}")

    print("\nDone. See:", out_by)

# ---- Run -------------------------------------------------------------------

if __name__ == "__main__":
    download_and_organise(
        out_dir=OUT_DIR,
        years_back=YEARS_BACK,           # 3 -> last 3 years from today (inclusive)
        split_by_icb=SPLIT_BY_ICB,       # True if you want per-ICB-of-practice CSVs too
        lsoa_to_icb_residence_lookup=LSOA_TO_ICB_RESIDENCE_LOOKUP
    )

Found 781 asset URLs.
Keeping 376 assets after filtering by years/dataset.
✓ practice_quinary_age       2021-01     <- gp-reg-pat-prac-quin-age.csv
✓ practice_map               2021-01     <- gp-reg-pat-prac-map_v2.csv
✓ practice_single_age_female  2021-01     <- gp-reg-pat-prac-sing-age-female.csv
✓ practice_single_age_male   2021-01     <- gp-reg-pat-prac-sing-age-male.csv
✓ practice_all               2021-01     <- gp-reg-pat-prac-all.csv
✓ single_age_regions         2021-01     <- gp-reg-pat-prac-sing-age-regions.csv
✓ lsoa_by_sex                2021-01     <- gp-reg-pat-prac-lsoa-male-female-Jan-21.zip
✓ practice_all               2021-02     <- gp-reg-pat-prac-all.csv
✓ practice_single_age_male   2021-02     <- gp-reg-pat-prac-sing-age-male.csv
✓ practice_map               2021-02     <- gp-reg-pat-prac-map.csv
✓ practice_quinary_age       2021-02     <- gp-reg-pat-prac-quin-age.csv
✓ practice_single_age_female  2021-02     <- gp-reg-pat-prac-sing-age-female.csv
✓ single_age_regi