
# MIMIC‑IV Hypercapnia Cohort — **ICD ∪ Physiologic Thresholds** (BigQuery)

TODO: 

[ ] want to add ED-rendered diagnoses - a flag to split off whether the hypercapnic respiratory failure ICD was rendered in the ED or during the hospital stay. 
[ ] are the installation instructions at the beginning of this notebook consistent with the way we intend the notebook to be used? is it also consistent with the README.md instructions?

**Goal:** Build an admissions‑level tabular dataset that **enrolls** any hospital admission (`hadm_id`) meeting **_any_** of:

1. **ICD** codes for hypercapnic respiratory failure (legacy cohort).
2. **Any arterial blood gas** (LAB or POC) with **PaCO₂ ≥ 45.0 mmHg** anywhere during the episode.
3. **Any venous blood gas** (LAB or POC) with **PaCO₂ ≥ 50.0 mmHg** anywhere during the episode.

Then, keep all downstream columns/logic from the current workflow:
- Per‑code ICD indicators and an `any_hypercap_icd` flag.
- Robust extraction of **first ABG** and **first VBG** (across LAB + POC) with standardized units (mmHg) and pairing logic.
- Demographics/outcomes, NIH/OMB race & ethnicity, ED triage + first ED vitals, ICU meta (first stay + LOS), ventilation flags.
- Sanity checks.

> **Assumptions**
> - You already configured BigQuery auth (`gcloud auth application-default login`) and `.env` variables as in the previous notebook.
> - The PhysioNet hosting project is `physionet-data`.
> - Datasets exist (e.g., `mimiciv_3_1_hosp`, `mimiciv_3_1_icu`, and an ED dataset such as `mimiciv_ed`). This notebook auto-detects the ED dataset.


**Execution timing note:** To capture per-cell runtimes, enable cell execution timing in VS Code (Notebook: Show Cell Execution Time) or enable ExecuteTime in Jupyter. Then re-run and save to allow runtime summaries.


## Pipeline stages (readable map)
1. **Stage A — Config & helpers**: dataset IDs, thresholds, shared helper functions.
2. **Stage B — Cohort spine**: ICD + gas thresholds → `hadm_list` / `ed_stay_list`.
3. **Stage C — Bulk pulls**: one query per table where possible.
4. **Stage D — Transforms**: panels, flags, timing, comorbidities, vitals.
5. **Stage E — QA checks**: deterministic assertions and range checks.
6. **Stage F — Outputs**: parquet + Excel exports.


## SQL registry (centralized query templates)
All BigQuery SQL is defined in one place below, then referenced by name in subsequent cells.


In [None]:
# Purpose: Build ABG/VBG hypercapnia threshold flags from lab and ICU POC pCO2 measurements.

import sys
print(sys.executable)

# Central SQL registry (define all query templates here)
SQL = {}

def sql(name: str) -> str:
    if name not in SQL:
        raise KeyError(f"SQL template not found: {name}")
    return SQL[name]

# SQL templates (populated below in-place to keep notebook linear)
# Names: admit_sql, co2_thresholds_sql, cohort_icd_sql, counts_sql, demo_sql, ditems_sql, ed_counts_sql, ed_first_vitals_sql, ed_spine_sql, ed_to_icu_sql, ed_triage_sql, ed_vitals_sql, icd_sql, icu_meta_sql, icu_sql, labitems_sql, labs_sql, vent_chart_sql, vent_sql


**Rationale:** Define a reproducible, admission-level cohort that captures hypercapnia using complementary diagnostic (ICD) and physiologic (blood gas) criteria.


# MIMIC‑IV on BigQuery

## Environment Bootstrap & Smoke Test

Purpose: make a clean, reproducible start on a new machine.

Outcome: verify auth, project config, and dataset access; provide a reusable BigQuery runner for the build notebook.

**Rationale:** Establish the BigQuery environment and dataset configuration so queries are consistent and reproducible across runs.


## 0. Prerequisites (one-time)

**Accounts & access**
- PhysioNet access to MIMIC-IV on BigQuery; in BigQuery Console star project `physionet-data`.
- A Google Cloud **Project ID** with BigQuery API enabled (this is your **billing** project).

**CLI & environment**
- Google Cloud SDK (gcloud) installed and on PATH.
- Python environment created with `uv` (see README) and Jupyter kernel selected.
- A project-local **`.env`** with the variables below.

**.env variables**
```ini
MIMIC_BACKEND=bigquery
WORK_PROJECT=<your-billing-project-id>
BQ_PHYSIONET_PROJECT=physionet-data
BQ_DATASET_HOSP=mimiciv_3_1_hosp
BQ_DATASET_ICU=mimiciv_3_1_icu
BQ_DATASET_ED=mimiciv_ed
WORK_DIR=/path/to/Hypercap-CC-NLP
# GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
```

**Command line quickstart**
```bash
brew install --cask google-cloud-sdk
gcloud init
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com --project <your-billing-project-id>
ls -l ~/.config/gcloud/application_default_credentials.json
```


**Rationale:** Verify access and credentials up front to prevent silent failures later in the pipeline.


In [None]:
# Purpose: Set up project paths, environment variables, and BigQuery client connections for reproducible execution.

# --- Imports & environment
import os, re, json, math, textwrap
from pathlib import Path

import numpy as np
import pandas as pd

from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv

load_dotenv()

WORK_DIR = Path(os.getenv("WORK_DIR", Path.cwd())).expanduser().resolve()
DATA_DIR = WORK_DIR / "MIMIC tabular data"
DATA_DIR.mkdir(parents=True, exist_ok=True)

# ---- Backend selection (we use BigQuery)
BACKEND = os.getenv("MIMIC_BACKEND", "bigquery").strip().lower()
assert BACKEND == "bigquery", "This notebook is BigQuery-specific."

WORK_PROJECT = os.getenv("WORK_PROJECT", "").strip()  # your billing project
PHYS = os.getenv("BQ_PHYSIONET_PROJECT", "physionet-data").strip()  # hosting project (read-only)

# Dataset preferences: resolved to accessible datasets in the next setup cell.
HOSP = os.getenv("BQ_DATASET_HOSP", "mimiciv_3_1_hosp").strip()
ICU  = os.getenv("BQ_DATASET_ICU",  "mimiciv_3_1_icu").strip()
ED   = os.getenv("BQ_DATASET_ED",   "").strip()

# BigQuery client
client = bigquery.Client(project=WORK_PROJECT)

print("Project:", WORK_PROJECT)
print("PhysioNet host:", PHYS)
print("HOSP (pref):", HOSP, "ICU (pref):", ICU, "ED (pref):", ED)
print("WORK_DIR:", WORK_DIR)


In [None]:
# Purpose: Define reusable BigQuery helpers and resolve dataset names across known naming variants.

from datetime import datetime, timezone

# Fail fast on long-running queries instead of hanging indefinitely in nbconvert.
BQ_QUERY_TIMEOUT_SECS = int(os.getenv("BQ_QUERY_TIMEOUT_SECS", "1800"))

# --- Helper: run SQL with optional named parameters
def run_sql_bq(sql: str, params: dict | None = None) -> pd.DataFrame:
    job_config = bigquery.QueryJobConfig()
    job_config.labels = {"pipeline": "hypercapcc", "notebook": "cohort"}
    if params:
        bq_params = []
        for k, v in params.items():
            if isinstance(v, (list, tuple, np.ndarray, pd.Series)):
                # BigQuery ARRAY<INT64> if all ints; else ARRAY<STRING>
                v_list = list(v)
                if all(isinstance(x, (int, np.integer)) for x in v_list):
                    bq_params.append(bigquery.ArrayQueryParameter(k, "INT64", list(map(int, v_list))))
                else:
                    bq_params.append(bigquery.ArrayQueryParameter(k, "STRING", list(map(str, v_list))))
            else:
                # scalar
                if isinstance(v, (int, np.integer)):
                    bq_params.append(bigquery.ScalarQueryParameter(k, "INT64", int(v)))
                elif isinstance(v, float):
                    bq_params.append(bigquery.ScalarQueryParameter(k, "FLOAT64", float(v)))
                else:
                    bq_params.append(bigquery.ScalarQueryParameter(k, "STRING", str(v)))
        job_config.query_parameters = bq_params

    job = client.query(sql, job_config=job_config)
    try:
        result = job.result(timeout=BQ_QUERY_TIMEOUT_SECS)
    except Exception as exc:
        try:
            job.cancel()
        except Exception:
            pass
        raise RuntimeError(
            f"BigQuery query failed or timed out after {BQ_QUERY_TIMEOUT_SECS}s (job_id={job.job_id})."
        ) from exc

    try:
        return result.to_dataframe(create_bqstorage_client=True)
    except TypeError:
        return result.to_dataframe()

# --- Helper: test if a fully-qualified table exists and is accessible
def table_exists(fqtn: str) -> bool:
    try:
        _ = run_sql_bq(f"SELECT 1 FROM `{fqtn}` LIMIT 1")
        return True
    except Exception:
        return False

def resolve_dataset(preferred: str, candidates: list[str], probe_table: str, label: str) -> tuple[str, str]:
    # Keep preferred first, then try known aliases.
    ordered = []
    if preferred:
        ordered.append(preferred)
    for cand in candidates:
        if cand not in ordered:
            ordered.append(cand)

    for dataset in ordered:
        fqtn = f"{PHYS}.{dataset}.{probe_table}"
        if table_exists(fqtn):
            return dataset, fqtn

    raise RuntimeError(
        f"No accessible {label} dataset found for probe table '{probe_table}'. Tried: {ordered}"
    )

# Resolve HOSP/ICU/ED names so notebook runs across v3.1 naming variants.
HOSP, HOSP_PROBE = resolve_dataset(
    preferred=HOSP,
    candidates=["mimiciv_3_1_hosp", "mimiciv_v3_1_hosp", "mimiciv_hosp"],
    probe_table="admissions",
    label="HOSP",
)
ICU, ICU_PROBE = resolve_dataset(
    preferred=ICU,
    candidates=["mimiciv_3_1_icu", "mimiciv_v3_1_icu", "mimiciv_icu"],
    probe_table="icustays",
    label="ICU",
)
if not ED:
    ED = "mimiciv_ed"
ED, ED_PROBE = resolve_dataset(
    preferred=ED,
    candidates=["mimiciv_ed", "mimiciv_3_1_ed", "mimiciv_v3_1_ed"],
    probe_table="edstays",
    label="ED",
)

RUN_METADATA = {
    "run_utc": datetime.now(timezone.utc).isoformat(),
    "work_project": WORK_PROJECT,
    "physionet_project": PHYS,
    "datasets": {"hosp": HOSP, "icu": ICU, "ed": ED},
    "dataset_probes": {"hosp": HOSP_PROBE, "icu": ICU_PROBE, "ed": ED_PROBE},
    "notebook": "MIMICIV_hypercap_EXT_cohort.ipynb",
}

print("Resolved datasets:", RUN_METADATA["datasets"])
print("Dataset probes:", RUN_METADATA["dataset_probes"])


In [None]:
# Purpose: Create reusable data-quality and merge guardrail helpers to prevent silent join errors.

# --- Helper utilities for reproducibility and safe joins
def require_cols(df: pd.DataFrame, cols: list[str], name: str) -> None:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise KeyError(f"{name} missing columns: {missing}")

def assert_unique(df: pd.DataFrame, key: str, name: str) -> None:
    if df[key].duplicated().any():
        n = int(df[key].duplicated().sum())
        raise ValueError(f"{name} has {n} duplicate {key} values")

def safe_merge(left: pd.DataFrame, right: pd.DataFrame, on: list[str] | str, how: str, name: str) -> pd.DataFrame:
    # guard against accidental duplicate columns
    overlap = set(left.columns) & set(right.columns)
    if isinstance(on, str):
        on_cols = {on}
    else:
        on_cols = set(on)
    overlap = overlap - on_cols
    if overlap:
        raise ValueError(f"{name} merge would duplicate columns: {sorted(overlap)}")
    return left.merge(right, on=on, how=how)

def check_ranges(df: pd.DataFrame, ranges: dict[str, tuple[float, float]]) -> pd.DataFrame:
    rows = []
    for col, (lo, hi) in ranges.items():
        if col not in df.columns:
            continue
        bad = df[col].notna() & ((df[col] < lo) | (df[col] > hi))
        rows.append({"col": col, "n_bad": int(bad.sum())})
    return pd.DataFrame(rows)



## 1) ICD cohort flags (hypercapnic respiratory failure)

**Rationale:** Capture diagnosis-based hypercapnia from ED and hospital discharge codes to define a broad, clinically recognized cohort.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# Target ICD codes (dotless, uppercase)
ICD10_CODES = ['J9602','J9612','J9622','J9692','E662']
ICD9_CODES  = ['27803']

SQL["cohort_icd_sql"] = f"""
-- ICD-based cohort flags per admission
WITH target_codes AS (
  SELECT 'J9602' AS code, 10 AS ver UNION ALL
  SELECT 'J9612', 10 UNION ALL
  SELECT 'J9622', 10 UNION ALL
  SELECT 'J9692', 10 UNION ALL
  SELECT 'E662',  10 UNION ALL
  SELECT '27803', 9
),

-- Hospital ICDs restricted to target codes
hosp_dx AS (
  SELECT
    d.subject_id,
    d.hadm_id,
    UPPER(REPLACE(d.icd_code, '.', '')) AS code_norm,
    d.icd_version
  FROM `{PHYS}.{HOSP}.diagnoses_icd` d
  JOIN target_codes t
    ON t.ver = d.icd_version AND t.code = UPPER(REPLACE(d.icd_code, '.', ''))
  WHERE d.hadm_id IS NOT NULL
),

-- Hospital flags per admission
hosp_flags AS (
  SELECT
    subject_id, hadm_id,
    MAX(IF(icd_version=10 AND code_norm='J9602',1,0)) AS ICD10_J9602,
    MAX(IF(icd_version=10 AND code_norm='J9612',1,0)) AS ICD10_J9612,
    MAX(IF(icd_version=10 AND code_norm='J9622',1,0)) AS ICD10_J9622,
    MAX(IF(icd_version=10 AND code_norm='J9692',1,0)) AS ICD10_J9692,
    MAX(IF(icd_version=10 AND code_norm='E662', 1,0)) AS ICD10_E662,
    MAX(IF(icd_version=9  AND code_norm='27803',1,0)) AS ICD9_27803
  FROM hosp_dx
  GROUP BY subject_id, hadm_id
),

-- ED ICDs restricted to target codes (map to hadm via edstays)
ed_dx AS (
  SELECT
    s.subject_id,
    s.hadm_id,
    s.stay_id,
    s.intime AS ed_intime,
    UPPER(REPLACE(d.icd_code, '.', '')) AS code_norm,
    d.icd_version
  FROM `{PHYS}.{ED}.diagnosis` d
  JOIN `{PHYS}.{ED}.edstays` s
    ON s.subject_id = d.subject_id AND s.stay_id = d.stay_id
  JOIN target_codes t
    ON t.ver = d.icd_version AND t.code = UPPER(REPLACE(d.icd_code, '.', ''))
  WHERE s.hadm_id IS NOT NULL
),

-- ED flags per ED stay (so we can both: OR flags across stays and also pick earliest stay_id)
ed_flags_by_stay AS (
  SELECT
    subject_id, hadm_id, stay_id, MIN(ed_intime) AS ed_intime,
    MAX(IF(icd_version=10 AND code_norm='J9602',1,0)) AS ICD10_J9602,
    MAX(IF(icd_version=10 AND code_norm='J9612',1,0)) AS ICD10_J9612,
    MAX(IF(icd_version=10 AND code_norm='J9622',1,0)) AS ICD10_J9622,
    MAX(IF(icd_version=10 AND code_norm='J9692',1,0)) AS ICD10_J9692,
    MAX(IF(icd_version=10 AND code_norm='E662', 1,0)) AS ICD10_E662,
    MAX(IF(icd_version=9  AND code_norm='27803',1,0)) AS ICD9_27803
  FROM ed_dx
  GROUP BY subject_id, hadm_id, stay_id
),

-- OR the ED flags across all ED stays mapped to the same hadm
ed_flags_or AS (
  SELECT
    subject_id, hadm_id,
    MAX(ICD10_J9602) AS ICD10_J9602,
    MAX(ICD10_J9612) AS ICD10_J9612,
    MAX(ICD10_J9622) AS ICD10_J9622,
    MAX(ICD10_J9692) AS ICD10_J9692,
    MAX(ICD10_E662 ) AS ICD10_E662,
    MAX(ICD9_27803) AS ICD9_27803
  FROM ed_flags_by_stay
  GROUP BY subject_id, hadm_id
),

-- Earliest ED stay_id per hadm (NO UNNEST of aggregates; use [OFFSET(0)])
ed_earliest AS (
  SELECT
    subject_id,
    hadm_id,
    (ARRAY_AGG(STRUCT(stay_id, ed_intime) ORDER BY ed_intime LIMIT 1))[OFFSET(0)].stay_id AS stay_id
  FROM ed_flags_by_stay
  GROUP BY subject_id, hadm_id
),

-- Bring flags and earliest stay_id together
ed_by_hadm AS (
  SELECT
    f.subject_id,
    f.hadm_id,
    e.stay_id,
    f.ICD10_J9602,
    f.ICD10_J9612,
    f.ICD10_J9622,
    f.ICD10_J9692,
    f.ICD10_E662,
    f.ICD9_27803
  FROM ed_flags_or f
  LEFT JOIN ed_earliest e
    USING (subject_id, hadm_id)
),

-- Combine ED and hospital flags at the admission level
combined AS (
  SELECT
    COALESCE(h.subject_id, e.subject_id) AS subject_id,
    COALESCE(h.hadm_id,     e.hadm_id)   AS hadm_id,
    GREATEST(IFNULL(h.ICD10_J9602,0), IFNULL(e.ICD10_J9602,0)) AS ICD10_J9602,
    GREATEST(IFNULL(h.ICD10_J9612,0), IFNULL(e.ICD10_J9612,0)) AS ICD10_J9612,
    GREATEST(IFNULL(h.ICD10_J9622,0), IFNULL(e.ICD10_J9622,0)) AS ICD10_J9622,
    GREATEST(IFNULL(h.ICD10_J9692,0), IFNULL(e.ICD10_J9692,0)) AS ICD10_J9692,
    GREATEST(IFNULL(h.ICD10_E662 ,0), IFNULL(e.ICD10_E662 ,0)) AS ICD10_E662,
    GREATEST(IFNULL(h.ICD9_27803,0), IFNULL(e.ICD9_27803,0)) AS ICD9_27803,
    IF((IFNULL(h.ICD10_J9602,0)+IFNULL(h.ICD10_J9612,0)+IFNULL(h.ICD10_J9622,0)+IFNULL(h.ICD10_J9692,0)+IFNULL(h.ICD10_E662,0)+IFNULL(h.ICD9_27803,0)) > 0, 1, 0) AS any_hypercap_icd_hosp,
    IF((IFNULL(e.ICD10_J9602,0)+IFNULL(e.ICD10_J9612,0)+IFNULL(e.ICD10_J9622,0)+IFNULL(e.ICD10_J9692,0)+IFNULL(e.ICD10_E662,0)+IFNULL(e.ICD9_27803,0)) > 0, 1, 0) AS any_hypercap_icd_ed
  FROM hosp_flags h
  FULL OUTER JOIN ed_by_hadm e
    ON h.hadm_id = e.hadm_id
)

SELECT
  subject_id, hadm_id,
  ICD10_J9602, ICD10_J9612, ICD10_J9622, ICD10_J9692, ICD10_E662, ICD9_27803,
  IF((ICD10_J9602+ICD10_J9612+ICD10_J9622+ICD10_J9692+ICD10_E662+ICD9_27803) > 0, 1, 0) AS any_hypercap_icd,
  any_hypercap_icd_hosp,
  any_hypercap_icd_ed,
  CASE
    WHEN any_hypercap_icd_hosp=1 AND any_hypercap_icd_ed=1 THEN 'ED+HOSP'
    WHEN any_hypercap_icd_ed=1 THEN 'ED'
    WHEN any_hypercap_icd_hosp=1 THEN 'HOSP'
    ELSE 'NONE'
  END AS icd_source
FROM combined
"""

cohort_icd = run_sql_bq(sql("cohort_icd_sql"))
print("ICD cohort admissions:", len(cohort_icd))
cohort_icd.head(3)



## 2) Blood gas inclusion thresholds — ANY PaCO₂ meeting cutoffs (LAB + POC)

**Rationale:** Identify physiologic hypercapnia using ABG/VBG thresholds, independent of diagnostic coding.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

SQL["co2_thresholds_sql"] = f"""
/* ---- LAB (HOSP) pCO2 across entire dataset ---- */
WITH hosp_cand AS (
  SELECT
    le.hadm_id, le.charttime, le.specimen_id,
    COALESCE(
      CAST(le.valuenum AS FLOAT64),
      SAFE_CAST(REGEXP_EXTRACT(LOWER(le.value), r'(-?\d+(?:\.\d+)?)') AS FLOAT64)
    ) AS val,
    LOWER(REPLACE(COALESCE(le.valueuom,''),' ','')) AS uom_nospace,
    LOWER(di.label) AS lbl,
    LOWER(COALESCE(di.fluid,'')) AS fl
  FROM `{PHYS}.{HOSP}.labevents` le
  JOIN `{PHYS}.{HOSP}.d_labitems` di ON di.itemid = le.itemid
  WHERE (le.valuenum IS NOT NULL OR le.value IS NOT NULL)
    AND (
      LOWER(COALESCE(di.category,'')) LIKE '%blood gas%' OR
      LOWER(di.label) LIKE '%pco2%' OR
      REGEXP_CONTAINS(LOWER(di.label), r'\bpa?\s*co(?:2|₂)\b')
    )
    AND NOT REGEXP_CONTAINS(LOWER(di.label),
        r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar)')
),
hosp_spec AS (
  SELECT le.specimen_id, LOWER(COALESCE(le.value,'')) AS spec_val
  FROM `{PHYS}.{HOSP}.labevents` le
  JOIN `{PHYS}.{HOSP}.d_labitems` di ON di.itemid = le.itemid
  WHERE le.specimen_id IS NOT NULL
    AND REGEXP_CONTAINS(LOWER(di.label), r'(specimen|sample)')
),
hosp_pco2 AS (
  SELECT
    c.hadm_id, c.charttime,
    CASE
      WHEN REGEXP_CONTAINS(s.spec_val, r'arter') OR REGEXP_CONTAINS(s.spec_val, r'\bart\b') THEN 'arterial'
      WHEN REGEXP_CONTAINS(s.spec_val, r'ven|mixed|central') THEN 'venous'
      WHEN REGEXP_CONTAINS(c.fl, r'arter') THEN 'arterial'
      WHEN REGEXP_CONTAINS(c.fl, r'ven') THEN 'venous'
      WHEN c.fl LIKE '%arterial%' OR REGEXP_CONTAINS(c.lbl, r'\b(abg|art|arterial|a[- ]?line)\b') THEN 'arterial'
      WHEN c.fl LIKE '%ven%'      OR REGEXP_CONTAINS(c.lbl, r'\b(vbg|ven|venous|mixed|central)\b') THEN 'venous'
      ELSE 'other'
    END AS site,
    CASE WHEN c.uom_nospace='kpa' THEN c.val*7.50062 ELSE c.val END AS pco2_mmHg
  FROM hosp_cand c
  LEFT JOIN hosp_spec s USING (specimen_id)
  WHERE c.val IS NOT NULL
),
hosp_pco2_std AS (
  SELECT hadm_id, site, charttime, pco2_mmHg
  FROM hosp_pco2
  WHERE site IN ('arterial','venous','other') AND pco2_mmHg BETWEEN 5 AND 200
),

/* ---- ICU (POC) pCO2 across entire dataset ---- */
icu_raw AS (
  SELECT
    ie.hadm_id,
    ce.stay_id,
    ce.charttime,
    LOWER(di.label) AS lbl,
    LOWER(REPLACE(COALESCE(ce.valueuom,''),' ','')) AS uom_nospace,
    LOWER(COALESCE(ce.value,'')) AS valstr,
    COALESCE(
      CAST(ce.valuenum AS FLOAT64),
      SAFE_CAST(REGEXP_EXTRACT(LOWER(ce.value), r'(-?\d+(?:\.\d+)?)') AS FLOAT64)
    ) AS val
  FROM `{PHYS}.{ICU}.chartevents` ce
  JOIN `{PHYS}.{ICU}.d_items`  di ON di.itemid = ce.itemid
  JOIN `{PHYS}.{ICU}.icustays` ie ON ie.stay_id = ce.stay_id
),
icu_cand AS (
  SELECT
    hadm_id, stay_id, charttime, lbl, uom_nospace, valstr, val,
    CASE
      WHEN (
            REGEXP_CONTAINS(lbl, r'(^|[^a-z])p(?:a)?\s*co\s*(?:2|₂)([^a-z]|$)')
            OR uom_nospace IN ('mmhg','kpa')
            OR REGEXP_CONTAINS(valstr, r'\b(mm\s*hg|kpa)\b')
           )
           AND NOT REGEXP_CONTAINS(lbl,
               r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar|v\s*co2|vco2|co2\s*(prod|elimin|production|elimination))')
      THEN 'pco2'
      ELSE NULL
    END AS analyte,
    CASE
      WHEN REGEXP_CONTAINS(lbl, r'\b(abg|art|arterial|a[- ]?line)\b') THEN 'arterial'
      WHEN REGEXP_CONTAINS(lbl, r'\b(vbg|ven|venous|mixed|central)\b') THEN 'venous'
      ELSE 'other'
    END AS site
  FROM icu_raw
  WHERE val IS NOT NULL
    AND (
      REGEXP_CONTAINS(lbl, r'(^|[^a-z])p(?:a)?\s*co\s*(?:2|₂)([^a-z]|$)')
      OR uom_nospace IN ('mmhg','kpa')
      OR REGEXP_CONTAINS(valstr, r'\b(mm\s*hg|kpa)\b')
    )
    AND NOT REGEXP_CONTAINS(lbl,
        r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar|v\s*co2|vco2|co2\s*(prod|elimin|production|elimination))')
),
icu_co2_std AS (
  SELECT
    hadm_id,
    site,
    charttime,
    CASE WHEN uom_nospace='kpa' OR REGEXP_CONTAINS(valstr, r'\bkpa\b') THEN val*7.50062 ELSE val END AS pco2_mmHg
  FROM icu_cand
  WHERE analyte='pco2'
    AND site IN ('arterial','venous','other')
    AND (CASE WHEN uom_nospace='kpa' OR REGEXP_CONTAINS(valstr, r'\bkpa\b') THEN val*7.50062 ELSE val END) BETWEEN 5 AND 200
),

/* ---- Combine and threshold per admission ---- */
all_pco2 AS (
  SELECT * FROM hosp_pco2_std
  UNION ALL
  SELECT * FROM icu_co2_std
),
thresh AS (
  SELECT
    hadm_id,
    MAX(IF(site='arterial' AND pco2_mmHg >= 45.0, 1, 0)) AS abg_hypercap_threshold,
    MAX(IF(site='venous'   AND pco2_mmHg >= 50.0, 1, 0)) AS vbg_hypercap_threshold,
    MAX(IF(site='other'    AND pco2_mmHg >= 50.0, 1, 0)) AS other_hypercap_threshold
  FROM all_pco2
  GROUP BY hadm_id
)
SELECT * FROM thresh
"""

co2_thresh = run_sql_bq(sql("co2_thresholds_sql"))
print("Admissions meeting thresholds:", len(co2_thresh))
co2_thresh.head(3)



## 3) Cohort union (ICD ∪ thresholds) and `hadm_list` for downstream queries

**Rationale:** Combine ICD and physiologic routes to maximize sensitivity, then define a stable hadm_id list for downstream joins.


In [None]:
# Purpose: Combine ICD and gas-threshold ascertainment routes to produce the final hadm inclusion list.

# Outer-join because thresholds can identify hadm_id with no ICD codes and vice versa
cohort_any = cohort_icd.merge(co2_thresh, how="outer", on="hadm_id")

# Fill missing flags with 0 where appropriate
icd_cols = ["ICD10_J9602","ICD10_J9612","ICD10_J9622","ICD10_J9692","ICD10_E662","ICD9_27803","any_hypercap_icd","any_hypercap_icd_hosp","any_hypercap_icd_ed"]
for c in icd_cols:
    if c in cohort_any.columns:
        cohort_any[c] = cohort_any[c].fillna(0).astype(int)

for c in ["abg_hypercap_threshold","vbg_hypercap_threshold","other_hypercap_threshold"]:
    if c in cohort_any.columns:
        cohort_any[c] = cohort_any[c].fillna(0).astype(int)

# Final enrollment flag
cohort_any["pco2_threshold_any"] = ((cohort_any["abg_hypercap_threshold"]==1) | (cohort_any["vbg_hypercap_threshold"]==1) | (cohort_any["other_hypercap_threshold"]==1)).astype(int)
cohort_any["enrolled_any"] = ((cohort_any["any_hypercap_icd"]==1) | (cohort_any["pco2_threshold_any"]==1)).astype(int)

print("ICD-only admissions        :", int((cohort_any["any_hypercap_icd"]==1).sum()))
print("Threshold-only admissions  :", int(((cohort_any["pco2_threshold_any"]==1) & (cohort_any["any_hypercap_icd"]==0)).sum()))
print("Both ICD and threshold     :", int(((cohort_any["pco2_threshold_any"]==1) & (cohort_any["any_hypercap_icd"]==1)).sum()))
print("Total enrolled (union)     :", int((cohort_any["enrolled_any"]==1).sum()))

# New hadm list used for the rest of the notebook
hadm_list = cohort_any.loc[cohort_any["enrolled_any"]==1, "hadm_id"].dropna().astype("int64").tolist()
len(hadm_list)



## 4) First ABG and First VBG (LAB + POC, standardized to mmHg)

**Rationale:** Extract earliest ABG/VBG measurements to characterize baseline physiology with standardized units.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

params = {"hadms": hadm_list}

bg_pairs_sql = rf"""
WITH hadms AS (SELECT hadm_id FROM UNNEST(@hadms) AS hadm_id),

/* ---------------- LAB (HOSP) ---------------- */
hosp_cand AS (
  SELECT
    le.subject_id, le.hadm_id, le.charttime, le.specimen_id,
    CAST(le.valuenum AS FLOAT64) AS val,
    LOWER(COALESCE(le.valueuom,'')) AS uom,
    LOWER(di.label) AS lbl,
    LOWER(COALESCE(di.fluid,'')) AS fl,
    LOWER(COALESCE(di.category,'')) AS cat
  FROM `{PHYS}.{HOSP}.labevents`  le
  JOIN `{PHYS}.{HOSP}.d_labitems` di ON di.itemid = le.itemid
  JOIN hadms h ON h.hadm_id = le.hadm_id
  WHERE le.valuenum IS NOT NULL
    AND (
         LOWER(COALESCE(di.category,'')) LIKE '%blood gas%' OR
         LOWER(di.label) LIKE '%pco2%' OR
         REGEXP_CONTAINS(LOWER(di.label), r'\bph\b') OR
         REGEXP_CONTAINS(LOWER(di.label), r'\bpa?\s*co(?:2|₂)\b')
        )
    AND NOT REGEXP_CONTAINS(LOWER(di.label), r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar)')
),
hosp_spec AS (
  SELECT le.specimen_id, LOWER(COALESCE(le.value,'')) AS spec_val
  FROM `{PHYS}.{HOSP}.labevents` le
  JOIN `{PHYS}.{HOSP}.d_labitems` di ON di.itemid = le.itemid
  WHERE le.specimen_id IS NOT NULL
    AND REGEXP_CONTAINS(LOWER(di.label), r'(specimen|sample)')
),
hosp_class AS (
  SELECT
    c.hadm_id, c.charttime, c.specimen_id, c.val, c.uom, c.lbl, c.fl,
    CASE
      WHEN REGEXP_CONTAINS(c.lbl, r'\b(?:blood\s*)?ph\b') THEN 'ph'
      WHEN (c.lbl LIKE '%pco2%' OR REGEXP_CONTAINS(c.lbl, r'\bpa?\s*co(?:2|₂)\b')) THEN 'pco2'
      ELSE NULL
    END AS analyte,
    CASE
      WHEN REGEXP_CONTAINS(s.spec_val, r'arter') OR REGEXP_CONTAINS(s.spec_val, r'\bart\b') THEN 'arterial'
      WHEN REGEXP_CONTAINS(s.spec_val, r'ven|mixed|central') THEN 'venous'
      WHEN c.fl LIKE '%arterial%' OR REGEXP_CONTAINS(c.lbl, r'\b(abg|art|arterial|a[- ]?line)\b') THEN 'arterial'
      WHEN c.fl LIKE '%ven%'      OR REGEXP_CONTAINS(c.lbl, r'\b(vbg|ven|venous|mixed|central)\b') THEN 'venous'
      ELSE 'other'
    END AS site
  FROM hosp_cand c
  LEFT JOIN hosp_spec s USING (specimen_id)
),
hosp_pairs AS (
  SELECT
    hadm_id, specimen_id,
    MIN(charttime) AS sample_time,
    MAX(IF(analyte='ph',   val, NULL)) AS ph,
    MAX(IF(analyte='pco2', val, NULL)) AS pco2_raw,
    (ARRAY_AGG(IF(analyte='pco2', uom, NULL) IGNORE NULLS LIMIT 1))[OFFSET(0)] AS pco2_uom,
    (ARRAY_AGG(IF(analyte='ph',   uom, NULL) IGNORE NULLS LIMIT 1))[OFFSET(0)] AS ph_uom,
    (ARRAY_AGG(site IGNORE NULLS LIMIT 1))[OFFSET(0)] AS site
  FROM hosp_class
  GROUP BY hadm_id, specimen_id
  HAVING (ph IS NOT NULL OR pco2_raw IS NOT NULL) AND site IN ('arterial','venous','other')
),
hosp_pairs_std AS (
  SELECT
    hadm_id, specimen_id, sample_time, site,
    ph, ph_uom,
    CASE WHEN pco2_uom = 'kpa' THEN pco2_raw * 7.50062 ELSE pco2_raw END AS pco2_mmHg,
    'mmhg' AS pco2_uom_norm
  FROM hosp_pairs
  WHERE (ph IS NULL OR (ph BETWEEN 6.3 AND 7.8))
    AND (pco2_raw IS NULL OR (CASE WHEN pco2_uom='kpa' THEN pco2_raw*7.50062 ELSE pco2_raw END) BETWEEN 5 AND 200)
),
lab_abg AS (
  SELECT hadm_id,
         ph            AS lab_abg_ph,
         ph_uom        AS lab_abg_ph_uom,
         pco2_mmHg     AS lab_abg_paco2,
         'mmhg'        AS lab_abg_paco2_uom,
         sample_time   AS lab_abg_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM hosp_pairs_std WHERE site='arterial') WHERE rn=1
),
lab_vbg AS (
  SELECT hadm_id,
         ph            AS lab_vbg_ph,
         ph_uom        AS lab_vbg_ph_uom,
         pco2_mmHg     AS lab_vbg_paco2,
         'mmhg'        AS lab_vbg_paco2_uom,
         sample_time   AS lab_vbg_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM hosp_pairs_std WHERE site='venous') WHERE rn=1
),
lab_other AS (
  SELECT hadm_id,
         ph            AS lab_other_ph,
         ph_uom        AS lab_other_ph_uom,
         pco2_mmHg     AS lab_other_paco2,
         'mmhg'        AS lab_other_paco2_uom,
         sample_time   AS lab_other_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM hosp_pairs_std WHERE site='other') WHERE rn=1
),

/* ---------------- POC (ICU) ---------------- */
icu_raw AS (
  SELECT
    ie.hadm_id, ce.stay_id, ce.charttime,
    LOWER(di.label) AS lbl,
    LOWER(REPLACE(COALESCE(ce.valueuom,''),' ','')) AS uom_nospace,
    LOWER(COALESCE(ce.value,'')) AS valstr,
    COALESCE(
      CAST(ce.valuenum AS FLOAT64),
      SAFE_CAST(REGEXP_EXTRACT(LOWER(ce.value), r'(-?\d+(?:\.\d+)?)') AS FLOAT64)
    ) AS val
  FROM `{PHYS}.{ICU}.chartevents` ce
  JOIN `{PHYS}.{ICU}.d_items`  di ON di.itemid = ce.itemid
  JOIN `{PHYS}.{ICU}.icustays` ie ON ie.stay_id = ce.stay_id
  JOIN hadms h ON h.hadm_id = ie.hadm_id
),
icu_cand AS (
  SELECT
    hadm_id, stay_id, charttime, lbl, uom_nospace, valstr, val,
    CASE
      WHEN REGEXP_CONTAINS(lbl, r'(^|[^a-z])ph([^a-z]|$)') OR (val BETWEEN 6.3 AND 7.8) THEN 'ph'
      WHEN (
             REGEXP_CONTAINS(lbl, r'(^|[^a-z])p(?:a)?\s*co\s*(?:2|₂)([^a-z]|$)')
             OR uom_nospace IN ('mmhg','kpa')
             OR REGEXP_CONTAINS(valstr, r'\b(mm\s*hg|kpa)\b')
           )
           AND NOT REGEXP_CONTAINS(lbl, r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar|v\s*co2|vco2|co2\s*(prod|elimin|production|elimination))')
      THEN 'pco2'
      ELSE NULL
    END AS analyte,
    CASE
      WHEN REGEXP_CONTAINS(lbl, r'\b(abg|art|arterial|a[- ]?line)\b') THEN 'arterial'
      WHEN REGEXP_CONTAINS(lbl, r'\b(vbg|ven|venous|mixed|central)\b') THEN 'venous'
      ELSE 'other'
    END AS site
  FROM icu_raw
  WHERE val IS NOT NULL
    AND (
      REGEXP_CONTAINS(lbl, r'(^|[^a-z])ph([^a-z]|$)') OR
      REGEXP_CONTAINS(lbl, r'(^|[^a-z])p(?:a)?\s*co\s*(?:2|₂)([^a-z]|$)') OR
      uom_nospace IN ('mmhg','kpa') OR
      REGEXP_CONTAINS(valstr, r'\b(mm\s*hg|kpa)\b')
    )
    AND NOT REGEXP_CONTAINS(lbl, r'(et\s*co2|end[- ]?tidal|t\s*co2|tco2|total\s*co2|hco3|bicar|v\s*co2|vco2|co2\s*(prod|elimin|production|elimination))')
),
icu_ph AS (
  SELECT hadm_id, stay_id, charttime, val AS ph, site AS site_ph
  FROM icu_cand WHERE analyte='ph'
),
icu_co2 AS (
  SELECT hadm_id, stay_id, charttime, val AS pco2_raw, uom_nospace, valstr, site AS site_co2
  FROM icu_cand WHERE analyte='pco2'
),
icu_pair_win AS (
  SELECT
    p.hadm_id, p.stay_id,
    COALESCE(p.site_ph, c.site_co2) AS site,
    p.charttime AS ph_time, c.charttime AS co2_time,
    p.ph,
    CASE
      WHEN c.uom_nospace='kpa' OR REGEXP_CONTAINS(c.valstr, r'\bkpa\b') THEN 'kpa'
      WHEN c.uom_nospace='mmhg' OR REGEXP_CONTAINS(c.valstr, r'mm\s*hg') THEN 'mmhg'
      ELSE c.uom_nospace
    END AS pco2_uom_norm_raw,
    c.pco2_raw,
    ABS(TIMESTAMP_DIFF(c.charttime, p.charttime, SECOND)) AS dt_sec
  FROM icu_ph p
  JOIN icu_co2 c
    ON c.hadm_id = p.hadm_id
   AND c.stay_id = p.stay_id
   AND (COALESCE(p.site_ph, c.site_co2) IN ('arterial','venous','other'))
   AND ABS(TIMESTAMP_DIFF(c.charttime, p.charttime, MINUTE)) <= 10
  QUALIFY ROW_NUMBER() OVER (
    PARTITION BY p.hadm_id, p.stay_id, p.charttime
    ORDER BY dt_sec
  ) = 1
),
icu_pairs_std AS (
  SELECT
    hadm_id, stay_id, site,
    LEAST(ph_time, co2_time) AS sample_time,
    ph,
    CAST(NULL AS STRING) AS ph_uom,              -- POC pH is unitless/null
    CASE WHEN pco2_uom_norm_raw='kpa' THEN pco2_raw*7.50062 ELSE pco2_raw END AS pco2_mmHg,
    'mmhg' AS pco2_uom_norm
  FROM icu_pair_win
  WHERE (ph BETWEEN 6.3 AND 7.8 OR ph IS NULL)
    AND (CASE WHEN pco2_uom_norm_raw='kpa' THEN pco2_raw*7.50062 ELSE pco2_raw END) BETWEEN 5 AND 200
),
icu_solo_pco2_std AS (
  SELECT
    hadm_id, stay_id, site_co2 AS site,
    charttime AS sample_time,
    CAST(NULL AS FLOAT64) AS ph,
    CAST(NULL AS STRING)  AS ph_uom,            -- no pH here
    CASE WHEN uom_nospace='kpa' OR REGEXP_CONTAINS(valstr, r'\bkpa\b') THEN pco2_raw*7.50062 ELSE pco2_raw END AS pco2_mmHg,
    'mmhg' AS pco2_uom_norm
  FROM icu_co2
  WHERE site_co2 IN ('arterial','venous','other')
    AND pco2_raw BETWEEN 5 AND 200
),
icu_all AS (
  SELECT * FROM icu_pairs_std
  UNION ALL
  SELECT * FROM icu_solo_pco2_std
),

poc_abg AS (
  SELECT hadm_id,
         ph            AS poc_abg_ph,
         ph_uom        AS poc_abg_ph_uom,
         pco2_mmHg     AS poc_abg_paco2,
         'mmhg'        AS poc_abg_paco2_uom,
         sample_time   AS poc_abg_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM icu_all WHERE site='arterial') WHERE rn=1
),
poc_vbg AS (
  SELECT hadm_id,
         ph            AS poc_vbg_ph,
         ph_uom        AS poc_vbg_ph_uom,
         pco2_mmHg     AS poc_vbg_paco2,
         'mmhg'        AS poc_vbg_paco2_uom,
         sample_time   AS poc_vbg_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM icu_all WHERE site='venous') WHERE rn=1
),
poc_other AS (
  SELECT hadm_id,
         ph            AS poc_other_ph,
         ph_uom        AS poc_other_ph_uom,
         pco2_mmHg     AS poc_other_paco2,
         'mmhg'        AS poc_other_paco2_uom,
         sample_time   AS poc_other_time
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY hadm_id ORDER BY sample_time) rn
        FROM icu_all WHERE site='other') WHERE rn=1
)

/* ---------------- Final one row per hadm ---------------- */
SELECT
  h.hadm_id,
  -- LAB-ABG / LAB-VBG / LAB-OTHER
  la.lab_abg_ph, la.lab_abg_ph_uom, la.lab_abg_paco2, la.lab_abg_paco2_uom, la.lab_abg_time,
  lv.lab_vbg_ph, lv.lab_vbg_ph_uom, lv.lab_vbg_paco2, lv.lab_vbg_paco2_uom, lv.lab_vbg_time,
  lo.lab_other_ph, lo.lab_other_ph_uom, lo.lab_other_paco2, lo.lab_other_paco2_uom, lo.lab_other_time,
  -- POC-ABG / POC-VBG / POC-OTHER
  pa.poc_abg_ph, pa.poc_abg_ph_uom, pa.poc_abg_paco2, pa.poc_abg_paco2_uom, pa.poc_abg_time,
  pv.poc_vbg_ph, pv.poc_vbg_ph_uom, pv.poc_vbg_paco2, pv.poc_vbg_paco2_uom, pv.poc_vbg_time,
  po.poc_other_ph, po.poc_other_ph_uom, po.poc_other_paco2, po.poc_other_paco2_uom, po.poc_other_time,
  -- First ABG across LAB+POC
  (SELECT AS STRUCT src, t, ph, pco2
   FROM (SELECT 'LAB' AS src, la.lab_abg_time AS t, la.lab_abg_ph AS ph, la.lab_abg_paco2 AS pco2
         UNION ALL
         SELECT 'POC', pa.poc_abg_time, pa.poc_abg_ph, pa.poc_abg_paco2)
   WHERE t IS NOT NULL
   ORDER BY t LIMIT 1) AS first_abg,
  -- First VBG across LAB+POC
  (SELECT AS STRUCT src, t, ph, pco2
   FROM (SELECT 'LAB' AS src, lv.lab_vbg_time AS t, lv.lab_vbg_ph AS ph, lv.lab_vbg_paco2 AS pco2
         UNION ALL
         SELECT 'POC', pv.poc_vbg_time, pv.poc_vbg_ph, pv.poc_vbg_paco2)
   WHERE t IS NOT NULL
   ORDER BY t LIMIT 1) AS first_vbg,
  -- First OTHER-source pCO2 across LAB+POC
  (SELECT AS STRUCT src, t, ph, pco2
   FROM (SELECT 'LAB' AS src, lo.lab_other_time AS t, lo.lab_other_ph AS ph, lo.lab_other_paco2 AS pco2
         UNION ALL
         SELECT 'POC', po.poc_other_time, po.poc_other_ph, po.poc_other_paco2)
   WHERE t IS NOT NULL
   ORDER BY t LIMIT 1) AS first_other
FROM hadms h
LEFT JOIN lab_abg la USING (hadm_id)
LEFT JOIN lab_vbg lv USING (hadm_id)
LEFT JOIN lab_other lo USING (hadm_id)
LEFT JOIN poc_abg pa USING (hadm_id)
LEFT JOIN poc_vbg pv USING (hadm_id)
LEFT JOIN poc_other po USING (hadm_id)
"""

bg_pairs = run_sql_bq(bg_pairs_sql, params)

# Flatten STRUCTs for first_abg, first_vbg, and first_other
for col in ["first_abg","first_vbg","first_other"]:
    if col in bg_pairs.columns:
        bg_pairs[f"{col}_src"]  = bg_pairs[col].apply(lambda x: x.get("src") if isinstance(x, dict) else None)
        bg_pairs[f"{col}_time"] = bg_pairs[col].apply(lambda x: x.get("t")   if isinstance(x, dict) else None)
        bg_pairs[f"{col}_ph"]   = bg_pairs[col].apply(lambda x: x.get("ph")  if isinstance(x, dict) else None)
        bg_pairs[f"{col}_pco2"] = bg_pairs[col].apply(lambda x: x.get("pco2")if isinstance(x, dict) else None)
        bg_pairs = bg_pairs.drop(columns=[col])

bg_pairs.head(3)


## 5) Demographics & outcomes

**Rationale:** Build baseline covariates used for descriptive statistics and potential confounding adjustment.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.


SQL["demo_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
SELECT
  a.hadm_id,
  a.subject_id,
  a.admittime,
  a.dischtime,
  a.deathtime,
  a.admission_type,
  a.admission_location,
  a.discharge_location,
  a.insurance,
  -- LOS (days)
  TIMESTAMP_DIFF(a.dischtime, a.admittime, HOUR) / 24.0 AS hosp_los_days,
  -- in-hospital death
  IF(a.deathtime IS NOT NULL, 1, 0) AS death_in_hosp,
  -- demographics
  p.gender,
  SAFE_CAST(ROUND(p.anchor_age + (EXTRACT(YEAR FROM a.admittime) - p.anchor_year), 1) AS FLOAT64) AS age_at_admit,
  -- 30-day all-cause mortality from admission
  IF(p.dod IS NOT NULL AND DATE_DIFF(DATE(p.dod), DATE(a.admittime), DAY) BETWEEN 0 AND 30, 1, 0) AS death_30d
FROM `{PHYS}.{HOSP}.admissions` a
JOIN hadms h USING (hadm_id)
JOIN `{PHYS}.{HOSP}.patients` p USING (subject_id)
"""
demo = run_sql_bq(sql("demo_sql"), {"hadms": hadm_list})
print("Demo rows:", len(demo))
demo.head(3)



In [None]:
# Purpose: Create reusable data-quality and merge guardrail helpers to prevent silent join errors.

# ==== Drop-in: safe merge utilities (one cell, run once) ====
import pandas as pd
from typing import Iterable, Optional, Literal

def _ensure_Int64(s: pd.Series) -> pd.Series:
    """Coerce to pandas nullable Int64 (preserves NA)."""
    return pd.to_numeric(s, errors="coerce").astype("Int64")

def strip_subject_cols(fr: pd.DataFrame) -> pd.DataFrame:
    """Remove any subject_id-like columns from a frame (e.g., 'subject_id', 'Subject_ID')."""
    return fr.drop(columns=[c for c in fr.columns if c.lower().startswith("subject_id")],
                   errors="ignore")

def safe_merge_on_hadm(
    left: pd.DataFrame,
    right: pd.DataFrame,
    *,
    right_name: str,
    take: Optional[Iterable[str]] = None,
    order_by: Optional[Iterable[str]] = None,
    check_subject: Literal[False, "warn", "raise"] = False,
) -> pd.DataFrame:
    """
    Left-merge 'right' into 'left' on hadm_id, returning a copy of left with right's columns.
    - Dedupes right on hadm_id (optionally using order_by to pick the first row).
    - Optionally restricts right columns via `take`.
    - Optionally audits subject_id agreement before dropping subject_id from right.
    - Always strips subject_id-like columns from the right to prevent *_x/_y suffixes.
    - Raises if any *_x/_y suffixes still appear (indicates overlapping names besides hadm_id).
    """
    if "hadm_id" not in left.columns:
        raise KeyError(f"left frame lacks hadm_id before merging {right_name}")
    if "hadm_id" not in right.columns:
        raise KeyError(f"{right_name} lacks hadm_id")

    L = left.copy()
    R = right.copy()

    # Standardize dtypes of keys
    L["hadm_id"] = _ensure_Int64(L["hadm_id"])
    R["hadm_id"] = _ensure_Int64(R["hadm_id"])
    if "subject_id" in L.columns:
        L["subject_id"] = _ensure_Int64(L["subject_id"])
    if "subject_id" in R.columns:
        R["subject_id"] = _ensure_Int64(R["subject_id"])

    # Dedupe RIGHT by hadm_id (optionally order_by first)
    if order_by:
        R = (R.sort_values(list(order_by))
               .drop_duplicates(subset=["hadm_id"], keep="first"))
    else:
        R = R.drop_duplicates(subset=["hadm_id"], keep="first")

    # Optional subject_id consistency audit (before stripping)
    if check_subject and ("subject_id" in L.columns) and ("subject_id" in R.columns):
        # Join only on hadm_id where both sides have subject_id
        tmp = (L[["hadm_id", "subject_id"]]
                 .merge(R[["hadm_id", "subject_id"]],
                        on="hadm_id", how="inner", suffixes=("_L","_R")))
        mism = (tmp["subject_id_L"].notna() & tmp["subject_id_R"].notna() &
                (tmp["subject_id_L"] != tmp["subject_id_R"]))
        n_mism = int(mism.sum())
        if n_mism > 0:
            sample_ids = tmp.loc[mism, "hadm_id"].head(10).tolist()
            msg = (f"[{right_name}] subject_id mismatch on {n_mism} hadm_id(s). "
                   f"Examples: {sample_ids}")
            if check_subject == "raise":
                raise ValueError(msg)
            else:
                print("WARNING:", msg)

    # Limit right columns (avoid accidental overlaps)
    if take is not None:
        keep = ["hadm_id"] + [c for c in take if c != "hadm_id"]
        R = R[[c for c in keep if c in R.columns]]

    # Always strip subject_id-like columns from right to prevent *_x/_y
    R = strip_subject_cols(R)

    # Final merge
    out = L.merge(R, on="hadm_id", how="left", suffixes=("", ""))

    # Guard: no suffixes should be present
    bad = [c for c in out.columns if c.endswith("_x") or c.endswith("_y")]
    if bad:
        raise RuntimeError(
            f"Merge with {right_name} produced suffixed columns {bad}. "
            "You likely have overlapping column names other than hadm_id."
        )
    return out

print("Safe merge helpers loaded.")


## 6) NIH/OMB race & ethnicity (ED + Hospital)

**Rationale:** Harmonize race/ethnicity across sources using NIH/OMB categories for consistent reporting.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

race_eth_sql = rf"""
WITH hadms AS (
  SELECT x AS hadm_id
  FROM UNNEST(@hadms) AS x
),

-- Hospital admission "race" text
hosp AS (
  SELECT a.hadm_id, LOWER(TRIM(a.race)) AS race_hosp_raw
  FROM `{PHYS}.{HOSP}.admissions` a
  JOIN hadms hm USING (hadm_id)
),

-- Earliest ED stay leading to the admission; take its "race" text if present
ed_first AS (
  SELECT
    e.hadm_id,
    (ARRAY_AGG(STRUCT(e.intime AS intime, LOWER(TRIM(e.race)) AS race_ed_raw)
               ORDER BY e.intime ASC LIMIT 1))[OFFSET(0)] AS pick
  FROM `{PHYS}.{ED}.edstays` e
  JOIN hadms hm USING (hadm_id)
  GROUP BY e.hadm_id
),
ed AS (
  SELECT hadm_id, pick.race_ed_raw
  FROM ed_first
),

-- Combine ED + Hospital for maximum coverage
comb AS (
  SELECT
    hm.hadm_id,
    ho.race_hosp_raw,
    ed.race_ed_raw,
    TRIM(REGEXP_REPLACE(CONCAT(COALESCE(ho.race_hosp_raw,''), ' ', COALESCE(ed.race_ed_raw,'')), r'\s+', ' ')) AS race_text_any
  FROM hadms hm
  LEFT JOIN hosp ho USING (hadm_id)
  LEFT JOIN ed   ed USING (hadm_id)
),

-- Tokenization to OMB families + Hispanic ethnicity
tok AS (
  SELECT
    hadm_id, race_hosp_raw, race_ed_raw, race_text_any,

    -- Ethnicity (Hispanic)
    REGEXP_CONTAINS(race_text_any, r'\b(hispanic|latinx|latino|latina)\b') AS is_hisp,

    -- Race families (use boundaries to reduce false positives)
    REGEXP_CONTAINS(race_text_any, r'american\s+indian|\balaska\b') AS is_aian,
    REGEXP_CONTAINS(race_text_any, r'\basian\b') AS is_asian,
    REGEXP_CONTAINS(race_text_any, r'\b(black|african\s+american)\b') AS is_black,
    REGEXP_CONTAINS(race_text_any, r'hawaiian|pacific\s+islander') AS is_nhopi,
    REGEXP_CONTAINS(race_text_any, r'\bwhite\b|caucasian') AS is_white,

    -- Unknown/other indicators
    REGEXP_CONTAINS(race_text_any, r'unknown|other|declined|unable|not\s+reported|missing|null') AS is_unknown_any,

    -- Multi-race hints
    REGEXP_CONTAINS(race_text_any, r'(two|2)\s+or\s+more|multi|biracial|multiracial') AS is_multi_hint
  FROM comb
),

-- Decide ethnicity per NIH
ethn AS (
  SELECT
    hadm_id, race_hosp_raw, race_ed_raw, race_text_any,
    CASE
      WHEN is_hisp THEN 'Hispanic or Latino'
      WHEN (race_text_any IS NULL OR race_text_any = '' OR is_unknown_any) THEN 'Unknown or Not Reported'
      ELSE 'Not Hispanic or Latino'
    END AS nih_ethnicity,
    (CAST(is_aian AS INT64) + CAST(is_asian AS INT64) + CAST(is_black AS INT64)
     + CAST(is_nhopi AS INT64) + CAST(is_white AS INT64)) AS race_hits,
    is_aian, is_asian, is_black, is_nhopi, is_white, is_multi_hint, is_unknown_any
  FROM tok
),

-- Decide race per NIH/OMB (1997)
race_assign AS (
  SELECT
    hadm_id, race_hosp_raw, race_ed_raw, race_text_any, nih_ethnicity,
    CASE
      WHEN race_hits >= 2 OR is_multi_hint THEN 'More than one race'
      WHEN is_aian THEN 'American Indian or Alaska Native'
      WHEN is_asian THEN 'Asian'
      WHEN is_black THEN 'Black or African American'
      WHEN is_nhopi THEN 'Native Hawaiian or Other Pacific Islander'
      WHEN is_white THEN 'White'
      WHEN is_unknown_any OR race_text_any IS NULL OR race_text_any = '' THEN 'Unknown or Not Reported'
      ELSE 'Unknown or Not Reported'
    END AS nih_race
  FROM ethn
)

SELECT hadm_id, race_hosp_raw, race_ed_raw, nih_race, nih_ethnicity
FROM race_assign
"""
race_eth = run_sql_bq(race_eth_sql, {"hadms": hadm_list})
print("Race/Eth rows:", len(race_eth))
race_eth.head(3)



## 7) ED triage (linked to hadm) and first ED vitals

**Rationale:** Capture ED presentation features (vitals and chief complaint) for symptom and severity analyses.


In [None]:
# Purpose: Pull ED triage/first-vitals by loading ED tables once and filtering to cohort hadm_ids in pandas.

hadms_for_ed = set(
    pd.Series(hadm_list)
    .dropna()
    .astype("int64")
    .tolist()
)

# 1) ED stay map (full table), then restrict to cohort hadm_ids locally.
SQL["edmap_all_sql"] = f"""
SELECT stay_id, hadm_id, intime
FROM `{PHYS}.{ED}.edstays`
WHERE hadm_id IS NOT NULL
"""
edmap_all = run_sql_bq(sql("edmap_all_sql"))
edmap_all["hadm_id"] = pd.to_numeric(edmap_all["hadm_id"], errors="coerce").astype("Int64")
edmap_all["stay_id"] = pd.to_numeric(edmap_all["stay_id"], errors="coerce").astype("Int64")
edmap_all["intime"] = pd.to_datetime(edmap_all["intime"], errors="coerce")

edmap = edmap_all[edmap_all["hadm_id"].isin(hadms_for_ed)].copy()
edmap = edmap.drop_duplicates(subset=["stay_id", "hadm_id"])
print("ED map rows (cohort):", len(edmap))

if edmap.empty:
    ed_triage = pd.DataFrame(columns=[
        "hadm_id", "ed_triage_temp", "ed_triage_hr", "ed_triage_rr",
        "ed_triage_o2sat", "ed_triage_sbp", "ed_triage_dbp", "ed_triage_pain",
        "ed_triage_acuity", "ed_triage_cc",
    ])
    ed_first = pd.DataFrame(columns=[
        "hadm_id", "ed_first_vitals_time", "ed_first_temp", "ed_first_hr",
        "ed_first_rr", "ed_first_o2sat", "ed_first_sbp", "ed_first_dbp",
        "ed_first_rhythm", "ed_first_pain",
    ])
else:
    # 2) ED triage (full table), then reduce to earliest ED stay per hadm.
    SQL["ed_triage_all_sql"] = f"""
    SELECT
      stay_id,
      temperature    AS ed_triage_temp,
      heartrate      AS ed_triage_hr,
      resprate       AS ed_triage_rr,
      o2sat          AS ed_triage_o2sat,
      sbp            AS ed_triage_sbp,
      dbp            AS ed_triage_dbp,
      pain           AS ed_triage_pain,
      acuity         AS ed_triage_acuity,
      chiefcomplaint AS ed_triage_cc
    FROM `{PHYS}.{ED}.triage`
    """
    tri_all = run_sql_bq(sql("ed_triage_all_sql"))
    tri_all["stay_id"] = pd.to_numeric(tri_all["stay_id"], errors="coerce").astype("Int64")

    tri_merged = (
        edmap[["stay_id", "hadm_id", "intime"]]
        .merge(tri_all, on="stay_id", how="left")
    )
    ed_triage = (
        tri_merged
        .sort_values(["hadm_id", "intime"], na_position="last")
        .drop_duplicates(subset=["hadm_id"], keep="first")
        .drop(columns=["stay_id", "intime"])
        .reset_index(drop=True)
    )
    print("ED triage rows:", len(ed_triage))

    # 3) First vitals per stay across full ED vitals table, then reduce to earliest vitals-time per hadm.
    SQL["ed_first_vitals_all_sql"] = f"""
    WITH vs_ranked AS (
      SELECT
        v.stay_id,
        v.charttime,
        v.temperature,
        v.heartrate,
        v.resprate,
        v.o2sat,
        v.sbp,
        v.dbp,
        v.rhythm,
        v.pain,
        ROW_NUMBER() OVER (PARTITION BY v.stay_id ORDER BY v.charttime) AS rn
      FROM `{PHYS}.{ED}.vitalsign` v
    )
    SELECT
      stay_id,
      charttime    AS ed_first_vitals_time,
      temperature  AS ed_first_temp,
      heartrate    AS ed_first_hr,
      resprate     AS ed_first_rr,
      o2sat        AS ed_first_o2sat,
      sbp          AS ed_first_sbp,
      dbp          AS ed_first_dbp,
      rhythm       AS ed_first_rhythm,
      pain         AS ed_first_pain
    FROM vs_ranked
    WHERE rn = 1
    """
    first_stay_all = run_sql_bq(sql("ed_first_vitals_all_sql"))
    first_stay_all["stay_id"] = pd.to_numeric(first_stay_all["stay_id"], errors="coerce").astype("Int64")
    first_stay_all["ed_first_vitals_time"] = pd.to_datetime(first_stay_all["ed_first_vitals_time"], errors="coerce")

    first_merged = (
        edmap[["stay_id", "hadm_id"]]
        .merge(first_stay_all, on="stay_id", how="left")
    )
    ed_first = (
        first_merged
        .sort_values(["hadm_id", "ed_first_vitals_time"], na_position="last")
        .drop_duplicates(subset=["hadm_id"], keep="first")
        .drop(columns=["stay_id"])
        .reset_index(drop=True)
    )
    print("ED first vitals rows:", len(ed_first))


## 8) ICU meta (first ICU stay, LOS days)

**Rationale:** Summarize ICU exposure and length of stay to contextualize disease severity.


In [None]:
# Purpose: Build first-ICU metadata via full-table pull + local hadm filter to avoid large ARRAY parameter stalls.

SQL["icu_all_sql"] = f"""
SELECT
  hadm_id,
  stay_id AS first_icu_stay_id,
  intime  AS icu_intime,
  outtime AS icu_outtime
FROM `{PHYS}.{ICU}.icustays`
WHERE hadm_id IS NOT NULL
"""
icu_all = run_sql_bq(sql("icu_all_sql"))
icu_all["hadm_id"] = pd.to_numeric(icu_all["hadm_id"], errors="coerce").astype("Int64")
icu_all["first_icu_stay_id"] = pd.to_numeric(icu_all["first_icu_stay_id"], errors="coerce").astype("Int64")
icu_all["icu_intime"] = pd.to_datetime(icu_all["icu_intime"], errors="coerce")
icu_all["icu_outtime"] = pd.to_datetime(icu_all["icu_outtime"], errors="coerce")

hadms_for_icu = set(pd.Series(hadm_list).dropna().astype("int64").tolist())
icu_all = icu_all[icu_all["hadm_id"].isin(hadms_for_icu)].copy()

icu_meta = (
    icu_all
    .sort_values(["hadm_id", "icu_intime", "first_icu_stay_id"], na_position="last")
    .drop_duplicates(subset=["hadm_id"], keep="first")
    .reset_index(drop=True)
)
icu_meta["icu_los_days"] = (
    (icu_meta["icu_outtime"] - icu_meta["icu_intime"]).dt.total_seconds() / 86400.0
)

print("ICU meta rows:", len(icu_meta))


## 9) Ventilation flags (ICD procedures)

**Rationale:** Identify IMV/NIV exposure as clinically relevant respiratory support indicators.


In [None]:
# Purpose: Build IMV/NIV ICD flags from targeted procedure codes, then filter to cohort hadm_ids locally.

SQL["vent_proc_sql"] = f"""
SELECT
  hadm_id,
  icd_version,
  REPLACE(icd_code, '.', '') AS code_norm
FROM `{PHYS}.{HOSP}.procedures_icd`
WHERE
  (icd_version = 10 AND icd_code IN ('5A1935Z','5A1945Z','5A1955Z','0BH17EZ','0BH18EZ','5A09357','5A09457','5A09557'))
  OR
  (icd_version = 9 AND REPLACE(icd_code, '.', '') IN ('9670','9671','9672','9604','9390','9391','9399'))
"""

vent_proc = run_sql_bq(sql("vent_proc_sql"))
vent_proc["hadm_id"] = pd.to_numeric(vent_proc["hadm_id"], errors="coerce").astype("Int64")
vent_proc["icd_version"] = pd.to_numeric(vent_proc["icd_version"], errors="coerce").astype("Int64")
vent_proc["code_norm"] = vent_proc["code_norm"].astype(str)

hadms_for_vent = set(pd.Series(hadm_list).dropna().astype("int64").tolist())
vent_proc = vent_proc[vent_proc["hadm_id"].isin(hadms_for_vent)].copy()

imv_codes_10 = {"5A1935Z", "5A1945Z", "5A1955Z", "0BH17EZ", "0BH18EZ"}
imv_codes_9 = {"9670", "9671", "9672", "9604"}
niv_codes_10 = {"5A09357", "5A09457", "5A09557"}
niv_codes_9 = {"9390", "9391", "9399"}

vent_proc["imv_hit"] = (
    ((vent_proc["icd_version"] == 10) & vent_proc["code_norm"].isin(imv_codes_10)) |
    ((vent_proc["icd_version"] == 9) & vent_proc["code_norm"].isin(imv_codes_9))
)
vent_proc["niv_hit"] = (
    ((vent_proc["icd_version"] == 10) & vent_proc["code_norm"].isin(niv_codes_10)) |
    ((vent_proc["icd_version"] == 9) & vent_proc["code_norm"].isin(niv_codes_9))
)

vent = (
    vent_proc
    .groupby("hadm_id", as_index=False)
    .agg(
        imv_flag=("imv_hit", "max"),
        niv_flag=("niv_hit", "max"),
    )
)
vent["imv_flag"] = vent["imv_flag"].astype(int)
vent["niv_flag"] = vent["niv_flag"].astype(int)
vent["any_vent_flag"] = ((vent["imv_flag"] == 1) | (vent["niv_flag"] == 1)).astype(int)

# Preserve one row per hadm in hadm_list even if no vent procedure codes were found.
vent = pd.DataFrame({"hadm_id": pd.Series(hadm_list, dtype="Int64")}).drop_duplicates().merge(vent, on="hadm_id", how="left")
vent[["imv_flag", "niv_flag", "any_vent_flag"]] = vent[["imv_flag", "niv_flag", "any_vent_flag"]].fillna(0).astype(int)

print("Vent rows:", len(vent))


and from charts

In [None]:
# Purpose: Extract earliest NIV/IMV charttimes using prefiltered ventilation itemids for better performance.

SQL["vent_chart_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x),

stays AS (
  SELECT DISTINCT hadm_id, stay_id
  FROM `{PHYS}.{ICU}.icustays`
  WHERE hadm_id IN (SELECT hadm_id FROM hadms)
),

vent_itemids AS (
  SELECT itemid, LOWER(label) AS lbl
  FROM `{PHYS}.{ICU}.d_items`
  WHERE REGEXP_CONTAINS(LOWER(label), r'(vent|ventilator|mode|bipap|bi[- ]?pap|cpap|nippv|niv|mask|ett|endotracheal)')
),

cand AS (
  SELECT
    s.hadm_id,
    ce.charttime,
    vi.lbl,
    LOWER(COALESCE(ce.value, '')) AS valstr
  FROM `{PHYS}.{ICU}.chartevents` ce
  JOIN stays s ON s.stay_id = ce.stay_id
  JOIN vent_itemids vi ON vi.itemid = ce.itemid
),

flags AS (
  SELECT
    hadm_id,
    MIN(IF(
          REGEXP_CONTAINS(lbl, r'(non[- ]?invasive|niv|nippv|bipap|bi[- ]?pap|cpap)')
          OR REGEXP_CONTAINS(valstr, r'(non[- ]?invasive|niv|nippv|bipap|bi[- ]?pap|cpap)'),
          charttime, NULL)) AS first_niv_time,
    MIN(IF(
          REGEXP_CONTAINS(lbl, r'(invasive ventilation|endotracheal|ett|mech(|anical)? vent|ventilator mode|ac/|simv|prvc|aprv|pcv|vcv|assist\s*control)')
          OR REGEXP_CONTAINS(valstr, r'(invasive ventilation|endotracheal|ett|ac/|simv|prvc|aprv|pcv|vcv|assist\s*control)'),
          charttime, NULL)) AS first_imv_time,
    MAX(CASE
          WHEN REGEXP_CONTAINS(lbl, r'(non[- ]?invasive|niv|nippv|bipap|bi[- ]?pap|cpap)')
            OR REGEXP_CONTAINS(valstr, r'(non[- ]?invasive|niv|nippv|bipap|bi[- ]?pap|cpap)')
          THEN 1 ELSE 0 END) AS niv_chart_flag,
    MAX(CASE
          WHEN REGEXP_CONTAINS(lbl, r'(invasive ventilation|endotracheal|ett|mech(|anical)? vent|ventilator mode|ac/|simv|prvc|aprv|pcv|vcv|assist\s*control)')
            OR REGEXP_CONTAINS(valstr, r'(invasive ventilation|endotracheal|ett|ac/|simv|prvc|aprv|pcv|vcv|assist\s*control)')
          THEN 1 ELSE 0 END) AS imv_chart_flag
  FROM cand
  GROUP BY hadm_id
)

SELECT hadm_id, niv_chart_flag, imv_chart_flag, first_niv_time, first_imv_time
FROM flags
"""

try:
    vent_chart = run_sql_bq(sql("vent_chart_sql"), {"hadms": hadm_list})
except Exception as e:
    print("WARNING: vent chart extraction failed; falling back to ICD-only vent timing.", e)
    vent_chart = pd.DataFrame(columns=["hadm_id", "niv_chart_flag", "imv_chart_flag", "first_niv_time", "first_imv_time"])


In [None]:
# Purpose: Derive respiratory support flags and timing fields for IMV/NIV exposure.

# If your existing ICD-only result is called `vent`, rename for clarity:
vent_proc = vent.copy()

# Outer merge so we keep hadm_ids that appear in only one source
vent_combined = vent_proc.merge(vent_chart, on="hadm_id", how="outer")

# Fill missing with 0 before taking maxima
for c in ["imv_flag","niv_flag","any_vent_flag","imv_chart_flag","niv_chart_flag"]:
    if c in vent_combined.columns:
        vent_combined[c] = vent_combined[c].fillna(0).astype("Int64")

# Final "any-source" flags
vent_combined["imv_flag"]       = vent_combined[["imv_flag","imv_chart_flag"]].max(axis=1).astype("Int64")
vent_combined["niv_flag"]       = vent_combined[["niv_flag","niv_chart_flag"]].max(axis=1).astype("Int64")
vent_combined["any_vent_flag"]  = vent_combined[["imv_flag","niv_flag"]].max(axis=1).astype("Int64")

vent_combined = vent_combined[["hadm_id","imv_flag","niv_flag","any_vent_flag","first_imv_time","first_niv_time"]]
print("After combining ICD + chart signals:",
      "\nIMV=1:", int((vent_combined["imv_flag"]==1).sum()),
      "\nNIV=1:", int((vent_combined["niv_flag"]==1).sum()))



## 10) Assemble final DataFrame

**Rationale:** Merge all derived features into a single analytic table keyed by hadm_id.


In [None]:
# Purpose: Extract and standardize first ABG/VBG physiology fields for baseline characterization.

# Canonical base (carries authoritative subject_id)
df = demo.copy()

# Cohort flags / thresholds / labs / etc.
df = safe_merge_on_hadm(df, cohort_any, right_name="cohort_any", check_subject="warn")
df = safe_merge_on_hadm(df, bg_pairs,   right_name="bg_pairs")
df = safe_merge_on_hadm(df, race_eth,   right_name="race_eth")
df = safe_merge_on_hadm(df, ed_triage,  right_name="ed_triage")
df = safe_merge_on_hadm(df, ed_first,   right_name="ed_first")
df = safe_merge_on_hadm(df, icu_meta,   right_name="icu_meta")
df = safe_merge_on_hadm(df, vent_combined,       right_name="vent_combined")

# Anchor to first ED presentation (per admission)
if "ed_intime_first" in globals():
    df = safe_merge_on_hadm(df, ed_intime_first, right_name="ed_intime_first")

# Derived timing: first NIV/IMV relative to ED presentation
if "ed_intime_first" in df.columns and "first_imv_time" in df.columns:
    df["dt_first_imv_hours"] = (df["first_imv_time"] - df["ed_intime_first"]).dt.total_seconds() / 3600.0
if "ed_intime_first" in df.columns and "first_niv_time" in df.columns:
    df["dt_first_niv_hours"] = (df["first_niv_time"] - df["ed_intime_first"]).dt.total_seconds() / 3600.0

# ABG/VBG before IMV (hadm-level)
if {"first_abg_time", "first_imv_time"}.issubset(df.columns):
    df["abg_before_imv"] = (
        df["first_abg_time"].notna() & df["first_imv_time"].notna() &
        (df["first_abg_time"] < df["first_imv_time"])
    ).astype("Int64")
if {"first_vbg_time", "first_imv_time"}.issubset(df.columns):
    df["vbg_before_imv"] = (
        df["first_vbg_time"].notna() & df["first_imv_time"].notna() &
        (df["first_vbg_time"] < df["first_imv_time"])
    ).astype("Int64")

# Canonical hadm-level threshold normalization.
# This consolidates explicit threshold columns with parsed ABG/VBG/other pCO2 fields
# so later merges cannot silently zero-out source-specific thresholds.
def _num_series(frame: pd.DataFrame, col: str) -> pd.Series:
    if col in frame.columns:
        return pd.to_numeric(frame[col], errors="coerce")
    return pd.Series(np.nan, index=frame.index)


def _any_ge(frame: pd.DataFrame, cols: list[str], threshold: float) -> pd.Series:
    avail = [c for c in cols if c in frame.columns]
    if not avail:
        return pd.Series(0, index=frame.index, dtype="Int64")
    mat = pd.concat([_num_series(frame, c) for c in avail], axis=1)
    return mat.ge(threshold).any(axis=1).astype("Int64")


existing_abg = _num_series(df, "abg_hypercap_threshold").fillna(0).astype(int)
existing_vbg = _num_series(df, "vbg_hypercap_threshold").fillna(0).astype(int)
existing_other = _num_series(df, "other_hypercap_threshold").fillna(0).astype(int)

abg_from_values = _any_ge(df, ["lab_abg_paco2", "poc_abg_paco2", "first_abg_pco2"], 45.0)
vbg_from_values = _any_ge(df, ["lab_vbg_paco2", "poc_vbg_paco2", "first_vbg_pco2"], 50.0)
other_from_values = _any_ge(df, ["first_other_pco2"], 50.0)

df["abg_hypercap_threshold"] = ((existing_abg == 1) | (abg_from_values == 1)).astype("Int64")
df["vbg_hypercap_threshold"] = ((existing_vbg == 1) | (vbg_from_values == 1)).astype("Int64")
df["other_hypercap_threshold"] = ((existing_other == 1) | (other_from_values == 1)).astype("Int64")

df["pco2_threshold_any"] = (
    (df["abg_hypercap_threshold"] == 1)
    | (df["vbg_hypercap_threshold"] == 1)
    | (df["other_hypercap_threshold"] == 1)
).astype("Int64")

if "any_hypercap_icd" in df.columns:
    df["enrolled_any"] = ((pd.to_numeric(df["any_hypercap_icd"], errors="coerce").fillna(0).astype(int) == 1) | (df["pco2_threshold_any"] == 1)).astype("Int64")

print(
    "Threshold sanity (hadm-level):",
    "ABG=", int(df["abg_hypercap_threshold"].fillna(0).sum()),
    "VBG=", int(df["vbg_hypercap_threshold"].fillna(0).sum()),
    "OTHER=", int(df["other_hypercap_threshold"].fillna(0).sum()),
    "ANY=", int(df["pco2_threshold_any"].fillna(0).sum()),
)

print("Final df rows:", len(df), "cols:", len(df.columns))

# Safety checks
assert "subject_id" in df.columns, "subject_id missing from final df"
assert not any(c.endswith("_x") or c.endswith("_y") for c in df.columns), "Found suffixed columns"
print("Final df rows:", len(df), "cols:", len(df.columns))
df.head(3)



In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# --- Cohort flow counts (ED / ICU / blood gas / hypercapnia / CC) ---

# 1) Dataset-level ED counts
SQL["ed_counts_sql"] = f"""
SELECT
  COUNT(*) AS total_ed_encounters,
  COUNTIF(hadm_id IS NOT NULL) AS ed_encounters_with_hadm
FROM `{PHYS}.{ED}.edstays`
"""

SQL["ed_to_icu_sql"] = f"""
SELECT
  COUNT(DISTINCT e.hadm_id) AS ed_to_icu_hadm,
  COUNT(DISTINCT e.stay_id) AS ed_to_icu_edstays
FROM `{PHYS}.{ED}.edstays` e
JOIN `{PHYS}.{ICU}.icustays` i USING (hadm_id)
WHERE e.hadm_id IS NOT NULL
"""

try:
    ed_counts = run_sql_bq(sql("ed_counts_sql"))
    ed_to_icu = run_sql_bq(sql("ed_to_icu_sql"))
except Exception as e:
    print("Warning: ED/ICU counts query failed:", e)
    ed_counts = None
    ed_to_icu = None

# 2) Cohort-level counts (admission-level)
cohort_union = int((cohort_any["enrolled_any"] == 1).sum()) if "cohort_any" in globals() and "enrolled_any" in cohort_any.columns else len(hadm_list)
cohort_df_n = len(df)

# Any blood gas present (ABG/VBG, LAB/POC)
co2_cols = [c for c in [
    "lab_abg_paco2", "lab_vbg_paco2", "poc_abg_paco2", "poc_vbg_paco2"
] if c in df.columns]
any_bg = int(df[co2_cols].notna().any(axis=1).sum()) if co2_cols else None

# Hypercapnia thresholds and ICD
icd_count = int((df["any_hypercap_icd"] == 1).sum()) if "any_hypercap_icd" in df.columns else None
threshold_count = int((df["pco2_threshold_any"] == 1).sum()) if "pco2_threshold_any" in df.columns else None

# ED chief complaint missing / present (within cohort)
if "ed_triage_cc" in df.columns:
    mask_cc_present = df["ed_triage_cc"].notna() & (df["ed_triage_cc"].astype(str).str.strip() != "")
    cc_present = int(mask_cc_present.sum())
    cc_missing = int((~mask_cc_present).sum())
else:
    cc_present = None
    cc_missing = None

# ED→ICU within cohort (admissions with ED triage data and ICU stay)
if "first_icu_stay_id" in df.columns and "ed_triage_cc" in df.columns:
    cohort_ed_to_icu = int((df["first_icu_stay_id"].notna() & mask_cc_present).sum())
else:
    cohort_ed_to_icu = None

rows = []
if ed_counts is not None:
    rows.append({"step": "Total ED encounters (edstays)", "count": int(ed_counts.loc[0, "total_ed_encounters"]), "scope": "All ED dataset"})
    rows.append({"step": "ED encounters with hadm_id", "count": int(ed_counts.loc[0, "ed_encounters_with_hadm"]), "scope": "All ED dataset"})
if ed_to_icu is not None:
    rows.append({"step": "ED→ICU admissions (distinct hadm_id)", "count": int(ed_to_icu.loc[0, "ed_to_icu_hadm"]), "scope": "All ED+ICU"})
    rows.append({"step": "ED→ICU ED-stays (distinct stay_id)", "count": int(ed_to_icu.loc[0, "ed_to_icu_edstays"]), "scope": "All ED+ICU"})

rows.append({"step": "Cohort admissions (union ICD ∪ thresholds)", "count": cohort_union, "scope": "Cohort"})
rows.append({"step": "Cohort admissions after merges (df rows)", "count": cohort_df_n, "scope": "Cohort"})
if any_bg is not None:
    rows.append({"step": "Cohort with any ABG/VBG (LAB or POC)", "count": any_bg, "scope": "Cohort"})
if threshold_count is not None:
    rows.append({"step": "Cohort meeting hypercapnia thresholds", "count": threshold_count, "scope": "Cohort"})
if icd_count is not None:
    rows.append({"step": "Cohort meeting ICD code criteria", "count": icd_count, "scope": "Cohort"})
if cc_present is not None:
    rows.append({"step": "Cohort with ED chief complaint present", "count": cc_present, "scope": "Cohort"})
if cc_missing is not None:
    rows.append({"step": "Cohort excluded for missing ED chief complaint", "count": cc_missing, "scope": "Cohort"})
if cohort_ed_to_icu is not None:
    rows.append({"step": "Cohort ED→ICU (ED CC present + ICU stay)", "count": cohort_ed_to_icu, "scope": "Cohort"})

flow_counts = pd.DataFrame(rows)
flow_counts



In [None]:
# Purpose: Quantify overlap between ascertainment routes so non-exclusive cohorts are explicit.

# --- Ascertainment overlap counts (ABG/VBG/ICD) ---

required = ["abg_hypercap_threshold", "vbg_hypercap_threshold", "other_hypercap_threshold", "any_hypercap_icd"]
missing = [c for c in required if c not in df.columns]
if missing:
    raise KeyError(f"Missing required columns for overlap counts: {missing}")

abg = pd.to_numeric(df["abg_hypercap_threshold"], errors="coerce").fillna(0).astype(int)
vbg = pd.to_numeric(df["vbg_hypercap_threshold"], errors="coerce").fillna(0).astype(int)
other = pd.to_numeric(df["other_hypercap_threshold"], errors="coerce").fillna(0).astype(int)

gas_any = (
    pd.to_numeric(df.get("pco2_threshold_any", None), errors="coerce")
    if "pco2_threshold_any" in df.columns else (abg | vbg | other)
)
if hasattr(gas_any, "fillna"):
    gas_any = gas_any.fillna(0).astype(int)
else:
    gas_any = gas_any.astype(int)

icd = pd.to_numeric(df["any_hypercap_icd"], errors="coerce").fillna(0).astype(int)

total_n = len(df)
ngas = int((gas_any == 1).sum())

abg_vbg_overlap = pd.DataFrame([
    {"group": "ABG-only", "count": int(((abg==1) & (vbg==0) & (other==0)).sum())},
    {"group": "VBG-only", "count": int(((vbg==1) & (abg==0) & (other==0)).sum())},
    {"group": "Other-only", "count": int(((other==1) & (abg==0) & (vbg==0)).sum())},
    {"group": "Mixed (>=2 routes)", "count": int((((abg + vbg + other) >= 2).sum()))},
])
if ngas > 0:
    abg_vbg_overlap["pct_of_gas"] = (abg_vbg_overlap["count"] / ngas * 100).round(1)
else:
    abg_vbg_overlap["pct_of_gas"] = 0.0
abg_vbg_overlap["pct_of_cohort"] = (abg_vbg_overlap["count"] / max(total_n,1) * 100).round(1)

icd_gas_overlap = pd.DataFrame([
    {"group": "ICD+Gas", "count": int(((icd==1) & (gas_any==1)).sum())},
    {"group": "ICD-only", "count": int(((icd==1) & (gas_any==0)).sum())},
    {"group": "Gas-only", "count": int(((icd==0) & (gas_any==1)).sum())},
    {"group": "Neither", "count": int(((icd==0) & (gas_any==0)).sum())},
])
icd_gas_overlap["pct_of_cohort"] = (icd_gas_overlap["count"] / max(total_n,1) * 100).round(1)

print("ABG/VBG/Other overlap (among gas-positive):")
print(abg_vbg_overlap.to_string(index=False))
print("ICD vs Gas overlap (cohort-level):")
print(icd_gas_overlap.to_string(index=False))



In [None]:
# Purpose: Capture ED presentation context (chief complaint, acuity, and early vitals) at the ED-stay level.

# --- Missingness summary (chief complaint, race/ethnicity, ED triage/vitals) ---

import numpy as np

summary_rows = []

# Chief complaint missingness (ED triage CC)
if "ed_triage_cc" in df.columns:
    cc_present = df["ed_triage_cc"].notna() & (df["ed_triage_cc"].astype(str).str.strip() != "")
    summary_rows.append({
        "variable": "ed_triage_cc_present",
        "missing_n": int((~cc_present).sum()),
        "missing_pct": float((~cc_present).mean())
    })

# Race/Ethnicity missingness (NIH categories + raw sources)
unknown_tokens = {
    "unknown or not reported",
    "unknown",
    "not reported",
    "missing",
    "declined",
    "unable"
}

def _missing_rate(series):
    if series is None:
        return None, None
    s = series.astype(str).str.strip()
    is_missing = series.isna() | (s == "") | s.str.lower().isin(unknown_tokens)
    return int(is_missing.sum()), float(is_missing.mean())

for col in ["nih_race", "nih_ethnicity", "race_hosp_raw", "race_ed_raw"]:
    if col in df.columns:
        m_n, m_p = _missing_rate(df[col])
        summary_rows.append({
            "variable": col,
            "missing_n": m_n,
            "missing_pct": m_p
        })

missing_summary = pd.DataFrame(summary_rows)
print("Missingness summary (key variables):")
missing_summary

# ED triage + first ED vitals missingness
triage_cols = [c for c in df.columns if c.startswith("ed_triage_")]
first_cols  = [c for c in df.columns if c.startswith("ed_first_")]

vital_cols = triage_cols + first_cols
if vital_cols:
    miss_tbl = (
        pd.DataFrame({"variable": vital_cols})
        .assign(
            missing_n=lambda d: [int(df[c].isna().sum()) for c in d["variable"]],
            missing_pct=lambda d: [float(df[c].isna().mean()) for c in d["variable"]]
        )
        .sort_values("missing_pct", ascending=False)
    )
    print("Missingness summary (ED triage + first ED vitals):")
    miss_tbl
else:
    miss_tbl = pd.DataFrame(columns=["variable", "missing_n", "missing_pct"])



## 11) Sanity checks

**Rationale:** Run QC checks to validate units, flags, and basic data integrity before export.


In [None]:
# Purpose: Derive respiratory support flags and timing fields for IMV/NIV exposure.


# ICU LOS negative?
if {"icu_los_days","first_icu_stay_id"}.issubset(df.columns):
    neg_los = int((df["icu_los_days"] < 0).fillna(False).sum())
    print("Negative ICU LOS rows:", neg_los)

# Vent flags consistency
vent_cols = {"imv_flag","niv_flag","any_vent_flag"}
if vent_cols.issubset(df.columns):
    any_calc = ((df["imv_flag"]==1) | (df["niv_flag"]==1)).fillna(False).astype(int)
    any_flag = pd.to_numeric(df["any_vent_flag"], errors="coerce").fillna(0).astype(int)
    mism = int((any_calc != any_flag).sum())
    print("any_vent_flag mismatches vs (imv|niv):", mism)

# UOMs: expect mmhg only
uom_cols = [c for c in df.columns if c.endswith("_paco2_uom")]
for c in uom_cols:
    vals = sorted(pd.Series(df[c]).dropna().astype(str).str.lower().str.strip().unique().tolist())
    print(c, vals)

# ABG/VBG coverage QC
def qc_pair(df, ph_col, co2_col, label, ph_lo=6.3, ph_hi=7.8, co2_lo=5, co2_hi=200):
    ph  = pd.to_numeric(df.get(ph_col), errors="coerce")
    co2 = pd.to_numeric(df.get(co2_col), errors="coerce")
    return {
        "pair": label,
        "present_any":  int(((ph.notna()) | (co2.notna())).sum()),
        "present_both": int(((ph.notna()) & (co2.notna())).sum()),
        "only_ph":      int(((ph.notna()) & (~co2.notna())).sum()),
        "only_pco2":    int(((co2.notna()) & (~ph.notna())).sum()),
        "ph_oob":       int((((ph  < ph_lo)  | (ph  > ph_hi))  & ph.notna()).sum()),
        "pco2_oob":     int((((co2 < co2_lo) | (co2 > co2_hi)) & co2.notna()).sum()),
    }

qc = pd.DataFrame([
    qc_pair(df, "lab_abg_ph","lab_abg_paco2","LAB ABG"),
    qc_pair(df, "lab_vbg_ph","lab_vbg_paco2","LAB VBG"),
    qc_pair(df, "poc_abg_ph","poc_abg_paco2","POC ABG"),
    qc_pair(df, "poc_vbg_ph","poc_vbg_paco2","POC VBG"),
])
qc


## 12) Save to Excel

**Rationale:** Persist cohort outputs for downstream annotation and NLP analyses.


In [None]:
# Purpose: Optional archive export of the full hadm-level cohort workbook.

from datetime import datetime

WRITE_ARCHIVE_XLSX_EXPORTS = os.getenv("WRITE_ARCHIVE_XLSX_EXPORTS", "0") == "1"

if WRITE_ARCHIVE_XLSX_EXPORTS:
    prior_runs_dir = DATA_DIR / "prior runs"
    prior_runs_dir.mkdir(parents=True, exist_ok=True)
    out_path = prior_runs_dir / f"mimic_hypercap_EXT_bq_abg_vbg_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
    with pd.ExcelWriter(out_path, engine="openpyxl") as xw:
        df.to_excel(xw, sheet_name="cohort", index=False)
        try:
            qc.to_excel(xw, sheet_name="qc_abg_vbg", index=False)
        except Exception:
            pass
    out_path
else:
    print("Skipping archive hadm-level workbook export (set WRITE_ARCHIVE_XLSX_EXPORTS=1 to enable).")


## Create Annotation Dataset

**Rationale:** Create ED chief-complaint subsets and a reproducible sample for manual annotation.


In [None]:
# Purpose: Capture ED presentation context (chief complaint, acuity, and early vitals) at the ED-stay level.

# ---- Extra exports: (1) ED chief-complaint only; (2) random sample of 160 patients ----
from datetime import datetime

# Dated artifacts go to archive folder (optional)
prior_runs_dir = DATA_DIR / "prior runs"
prior_runs_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
WRITE_ARCHIVE_XLSX_EXPORTS = os.getenv("WRITE_ARCHIVE_XLSX_EXPORTS", "0") == "1"

# 1) Filter to rows with a non-empty ED chief complaint
if "ed_triage_cc" not in df.columns:
    raise KeyError(
        "Column 'ed_triage_cc' not found in df. "
        "Ensure the ED triage merge cell ran earlier."
    )

mask_cc = df["ed_triage_cc"].notna() & (df["ed_triage_cc"].astype(str).str.strip() != "")
df_cc = df.loc[mask_cc].copy()

print(f"ED-CC present rows: {len(df_cc)} of {len(df)} "
      f"({(len(df_cc) / max(len(df),1)):.1%} of cohort).")

# Save ED-CC-only cohort only when archive exports are enabled
if WRITE_ARCHIVE_XLSX_EXPORTS:
    out_path_cc = prior_runs_dir / f"mimic_hypercap_EXT_EDcc_only_bq_abg_vbg_{timestamp}.xlsx"
    with pd.ExcelWriter(out_path_cc, engine="openpyxl") as xw:
        df_cc.to_excel(xw, sheet_name="cohort_cc_only", index=False)
        try:
            qc.to_excel(xw, sheet_name="qc_abg_vbg", index=False)
        except Exception:
            pass
    print("Saved:", out_path_cc)

# 2) Random sample of n = 160 patients (distinct subject_id), one row per patient
if "subject_id" not in df_cc.columns:
    raise KeyError("Column 'subject_id' missing; cannot sample by patient.")

# Make a one-row-per-patient frame by earliest admission
if "admittime" in df_cc.columns:
    df_cc_one = (
        df_cc.sort_values(["subject_id", "admittime"])
             .groupby("subject_id", as_index=False)
             .head(1)
    )
else:
    # Fallback if admittime not present: choose the smallest hadm_id per patient
    df_cc_one = (
        df_cc.sort_values(["subject_id", "hadm_id"])
             .groupby("subject_id", as_index=False)
             .head(1)
    )

N = 160
n_avail = len(df_cc_one)
n_take = min(N, n_avail)
if n_avail < N:
    print(f"Warning: only {n_avail} unique patients with ED chief complaint; sampling all of them.")

RANDOM_SEED = 42
df_cc_sample = df_cc_one.sample(n=n_take, random_state=RANDOM_SEED)

# Save the sample only when archive exports are enabled
if WRITE_ARCHIVE_XLSX_EXPORTS:
    out_path_cc_sample = prior_runs_dir / f"mimic_hypercap_EXT_EDcc_sample{n_take}_bq_abg_vbg_{timestamp}.xlsx"
    with pd.ExcelWriter(out_path_cc_sample, engine="openpyxl") as xw:
        df_cc_sample.to_excel(xw, sheet_name="cohort_cc_sample", index=False)
        try:
            qc.to_excel(xw, sheet_name="qc_abg_vbg", index=False)
        except Exception:
            pass
    print("Saved:", out_path_cc_sample)
else:
    print("Skipping ED-CC archive exports (set WRITE_ARCHIVE_XLSX_EXPORTS=1 to enable).")


# ED-stay cohort expansion (timing, severity, comorbidity, outcomes)

**Rationale:** Build a one-row-per-ED-stay analytic extract with time-anchored gas phenotypes, key comorbidities, and outcomes.


## Phase 0 — Inventory & missing-field registry

**Rationale:** Detect which fields already exist and only add missing fields to avoid redundant extraction or join explosions.


In [None]:
# Purpose: Inventory available fields and identify missing target variables before enrichment steps.

from pathlib import Path
import json
import pandas as pd

# Identify current cohort dataframe (admission-level)
if 'df' not in globals():
    raise NameError("Expected admission-level df to exist before inventory step.")

ED_KEY = "ed_stay_id"  # target key for ED-level cohort

print("Current admission-level df:", df.shape)
print("Current columns count:", len(df.columns))
if ED_KEY in df.columns:
    print("ED stay unique count:", int(df[ED_KEY].nunique()))
else:
    print("ED stay unique count: ED_KEY not in columns")

# Persist columns snapshot
cols_out = WORK_DIR / "current_columns.json"
cols_out.write_text(json.dumps(sorted(df.columns), indent=2))
print("Wrote:", cols_out)



# Persist ED-stay columns snapshot (if ed_df exists)
if "ed_df" in globals():
    ed_cols_out = WORK_DIR / "ed_columns.json"
    ed_cols_out.write_text(json.dumps(sorted(ed_df.columns), indent=2))
    print("Wrote:", ed_cols_out)


In [None]:
# Purpose: Capture ED presentation context (chief complaint, acuity, and early vitals) at the ED-stay level.

# Target field registry
TARGET_RAW_FIELDS = {
    "ed_edstays": [
        "ed_stay_id","subject_id","hadm_id","ed_intime","ed_outtime","ed_intime_first",
        "arrival_transport","disposition","ed_gender","ed_race",
    ],
    "ed_triage": [
        "ed_triage_temp","ed_triage_hr","ed_triage_rr","ed_triage_o2sat",
        "ed_triage_sbp","ed_triage_dbp","ed_triage_pain","ed_triage_acuity","ed_triage_cc",
    ],
    "ed_vitals_first": [
        "ed_first_vitals_time","ed_first_temp","ed_first_hr","ed_first_rr",
        "ed_first_o2sat","ed_first_sbp","ed_first_dbp","ed_first_rhythm","ed_first_pain",
    ],
    "admissions": [
        "admittime","dischtime","deathtime","hospital_expire_flag",
        "admission_type","admission_location","discharge_location",
        "insurance","language","marital_status","hosp_race",
    ],
    "icu": [
        "icu_stay_id","icu_intime_first","icu_outtime_last","icu_los_total","n_icu_stays",
        "first_careunit","last_careunit",
    ],
    "labs_gas": [
        "first_gas_time","first_pco2","first_ph","first_hco3","first_lactate",
        "max_pco2_0_6h","min_ph_0_6h","max_pco2_0_24h","min_ph_0_24h",
        "flag_abg_hypercapnia","flag_vbg_hypercapnia","flag_other_hypercapnia","flag_any_gas_hypercapnia",
        "gas_source_other_rate",
        "dt_first_imv_hours","dt_first_niv_hours","first_other_time",
    ],
    "omr": [
        "bmi_closest_pre_ed","height_closest_pre_ed","weight_closest_pre_ed",
    ],
    "dx_flags": [
        "flag_copd","flag_osa_ohs","flag_chf","flag_neuromuscular",
        "flag_opioid_substance","flag_pneumonia",
    ],
    "timing": [
        "dt_first_qualifying_gas_hours","presenting_hypercapnia","late_hypercapnia",
        "dt_first_imv_hours","dt_first_niv_hours","abg_before_imv","vbg_before_imv",
        "ph_band","hco3_band","lactate_band",
    ],
}

TARGET_DERIVED_FIELDS = [
    "hospital_los_hours","in_hospital_death",
]

TARGET_FIELDS = sorted({c for v in TARGET_RAW_FIELDS.values() for c in v} | set(TARGET_DERIVED_FIELDS))

# Grouped missing report
missing_by_group = {}
for group, cols in TARGET_RAW_FIELDS.items():
    missing_by_group[group] = [c for c in cols if c not in df.columns]

missing_derived = [c for c in TARGET_DERIVED_FIELDS if c not in df.columns]
missing_by_group["derived"] = missing_derived

missing_flat = [c for cols in missing_by_group.values() for c in cols]
print("Missing fields total:", len(missing_flat))
for group, cols in missing_by_group.items():
    if cols:
        print(f"- {group}: {cols}")





## Phase 1 — ED encounter spine and ED enrichment (one row per ED stay)

**Rationale:** Create a dedicated ED-stay-level cohort with ED-specific attributes to avoid mixing admission- and ED-level grains.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# ED stay spine (rename stay_id to ed_stay_id)

SQL["ed_spine_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
SELECT
  s.stay_id AS ed_stay_id,
  s.subject_id,
  s.hadm_id,
  s.intime AS ed_intime,
  s.outtime AS ed_outtime,
  s.arrival_transport,
  s.disposition,
  s.gender AS ed_gender,
  s.race   AS ed_race
FROM `{PHYS}.{ED}.edstays` s
JOIN hadms h ON h.hadm_id = s.hadm_id
"""

ed_spine = run_sql_bq(sql("ed_spine_sql"), {"hadms": hadm_list})
print("ED spine rows:", len(ed_spine), "unique ed_stay_id:", ed_spine["ed_stay_id"].nunique())

# ensure uniqueness
if ed_spine["ed_stay_id"].nunique() != len(ed_spine):
    raise ValueError("ed_stay_id not unique in ED spine")

# First ED presentation time per admission
ed_intime_first = (
    ed_spine.groupby("hadm_id", as_index=False)["ed_intime"]
    .min()
    .rename(columns={"ed_intime": "ed_intime_first"})
)

# Start ED-level df
ed_df = ed_spine.copy()
ed_df = ed_df.merge(ed_intime_first, on="hadm_id", how="left")



In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# ED triage and first ED vitals (reuse existing logic if present; otherwise join)

def _needs_cols(df, cols):
    return (df is None) or any(c not in df.columns for c in cols)

# Use existing ed_triage / ed_first if already in memory from earlier cells
try:
    _ = ed_triage
except NameError:
    ed_triage = None

try:
    _ = ed_first
except NameError:
    ed_first = None

# Force re-query if required keys are missing
if _needs_cols(locals().get('ed_triage', None), ['ed_stay_id', 'hadm_id']):
    ed_triage = None
if _needs_cols(locals().get('ed_first', None), ['ed_stay_id']):
    ed_first = None

# If missing, prefer in-memory tables loaded earlier in the notebook.
# This avoids re-running expensive ED queries during nbconvert.
if ed_triage is None:
    if 'edmap' in globals() and 'tri_all' in globals():
        ed_triage = (
            edmap[['stay_id', 'hadm_id']]
            .merge(tri_all, on='stay_id', how='left')
            .rename(columns={'stay_id': 'ed_stay_id'})
        )
        print('ED triage rows (from in-memory tables):', len(ed_triage))
    else:
        ed_triage_sql = f"""
        WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
        SELECT
          s.stay_id AS ed_stay_id,
          s.hadm_id,
          t.temperature    AS ed_triage_temp,
          t.heartrate      AS ed_triage_hr,
          t.resprate       AS ed_triage_rr,
          t.o2sat          AS ed_triage_o2sat,
          t.sbp            AS ed_triage_sbp,
          t.dbp            AS ed_triage_dbp,
          t.pain           AS ed_triage_pain,
          t.acuity         AS ed_triage_acuity,
          t.chiefcomplaint AS ed_triage_cc
        FROM `{PHYS}.{ED}.edstays` s
        JOIN hadms h ON h.hadm_id = s.hadm_id
        LEFT JOIN `{PHYS}.{ED}.triage` t ON t.stay_id = s.stay_id
        """
        ed_triage = run_sql_bq(ed_triage_sql, {'hadms': hadm_list})
        print('ED triage rows (fallback query):', len(ed_triage))

if ed_first is None:
    if 'edmap' in globals() and 'first_stay_all' in globals():
        ed_first = (
            edmap[['stay_id', 'hadm_id']]
            .merge(first_stay_all, on='stay_id', how='left')
            .rename(columns={'stay_id': 'ed_stay_id'})
        )
        print('ED first vitals rows (from in-memory tables):', len(ed_first))
    else:
        ed_first_vitals_sql = f"""
        WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x),
        edmap AS (
          SELECT stay_id, hadm_id
          FROM `{PHYS}.{ED}.edstays`
          WHERE hadm_id IN (SELECT hadm_id FROM hadms)
        ),
        vs AS (
          SELECT stay_id, charttime, temperature, heartrate, resprate, o2sat, sbp, dbp, rhythm, pain
          FROM `{PHYS}.{ED}.vitalsign`
        ),
        first_vs AS (
          SELECT
            v.stay_id,
            (ARRAY_AGG(STRUCT(v.charttime, v.temperature, v.heartrate, v.resprate, v.o2sat, v.sbp, v.dbp, v.rhythm, v.pain)
                       ORDER BY v.charttime LIMIT 1))[OFFSET(0)] AS pick
          FROM vs v
          JOIN edmap m USING (stay_id)
          GROUP BY v.stay_id
        )
        SELECT
          f.stay_id AS ed_stay_id,
          m.hadm_id,
          pick.charttime AS ed_first_vitals_time,
          pick.temperature AS ed_first_temp,
          pick.heartrate AS ed_first_hr,
          pick.resprate AS ed_first_rr,
          pick.o2sat AS ed_first_o2sat,
          pick.sbp AS ed_first_sbp,
          pick.dbp AS ed_first_dbp,
          pick.rhythm AS ed_first_rhythm,
          pick.pain AS ed_first_pain
        FROM first_vs f
        JOIN edmap m ON m.stay_id = f.stay_id
        """
        ed_first = run_sql_bq(ed_first_vitals_sql, {'hadms': hadm_list})
        print('ED first vitals rows (fallback query):', len(ed_first))

# Debug columns before merge
print('ed_triage cols:', list(ed_triage.columns))
print('ed_first cols:', list(ed_first.columns))
print('ed_df cols:', list(ed_df.columns))

if 'ed_stay_id' not in ed_df.columns:
    raise KeyError('ed_df missing ed_stay_id; ensure ED spine cell ran.')

# Merge ED triage + vitals onto ed_df with available keys
merge_keys_triage = [k for k in ["ed_stay_id", "hadm_id"] if k in ed_df.columns and k in ed_triage.columns]
if not merge_keys_triage:
    raise KeyError("No common keys between ed_df and ed_triage")
ed_df = ed_df.merge(ed_triage, on=merge_keys_triage, how="left")
merge_keys_first = [k for k in ["ed_stay_id", "hadm_id"] if k in ed_df.columns and k in ed_first.columns]
if not merge_keys_first:
    raise KeyError("No common keys between ed_df and ed_first")
ed_df = ed_df.merge(ed_first, on=merge_keys_first, how="left")






## Phase 2 — Hospital admission context and outcomes

**Rationale:** Add admission-level outcomes and demographics for downstream stratification and analysis.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# Admissions fields
SQL["admit_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
SELECT
  a.hadm_id,
  a.admittime,
  a.dischtime,
  a.deathtime,
  a.hospital_expire_flag,
  a.admission_type,
  a.admission_location,
  a.discharge_location,
  a.insurance,
  a.language,
  a.marital_status,
  a.race AS hosp_race
FROM `{PHYS}.{HOSP}.admissions` a
JOIN hadms h USING (hadm_id)
"""

admit = run_sql_bq(sql("admit_sql"), {"hadms": hadm_list})
print("Admissions rows:", len(admit))

ed_df = ed_df.merge(admit, on="hadm_id", how="left")

# Merge ventilation flags/times (hadm-level)
if "vent_combined" in globals():
    ed_df = ed_df.merge(vent_combined, on="hadm_id", how="left")

# Derived outcomes
ed_df["hospital_los_hours"] = (ed_df["dischtime"] - ed_df["admittime"]).dt.total_seconds() / 3600.0
ed_df["in_hospital_death"] = ((ed_df["hospital_expire_flag"] == 1) | ed_df["deathtime"].notna()).astype("int64")

# Concordance check
discord = (
    ((ed_df["hospital_expire_flag"] == 1) & ed_df["deathtime"].isna()) |
    ((ed_df["hospital_expire_flag"] == 0) & ed_df["deathtime"].notna())
)
print("Admissions discordance (expire_flag vs deathtime):", int(discord.sum()))



## Phase 3 — ICU timing and LOS

**Rationale:** Capture ICU exposure, timing, and total LOS for severity stratification.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# ICU stays (aggregate per hadm)
SQL["icu_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
SELECT
  i.hadm_id,
  i.stay_id AS icu_stay_id,
  i.intime,
  i.outtime,
  i.los,
  i.first_careunit,
  i.last_careunit
FROM `{PHYS}.{ICU}.icustays` i
JOIN hadms h USING (hadm_id)
"""

icu = run_sql_bq(sql("icu_sql"), {"hadms": hadm_list})
print("ICU stay rows:", len(icu))

if len(icu) > 0:
    icu_agg = (
        icu.sort_values(["hadm_id", "intime"]).groupby("hadm_id", as_index=False)
        .agg(
            icu_intime_first=("intime", "min"),
            icu_outtime_last=("outtime", "max"),
            icu_los_total=("los", "sum"),
            n_icu_stays=("icu_stay_id", "nunique"),
            first_careunit=("first_careunit", "first"),
            last_careunit=("last_careunit", "last"),
        )
    )
    ed_df = ed_df.merge(icu_agg, on="hadm_id", how="left")
else:
    print("No ICU stays found for cohort.")



## Phase 4 — ED longitudinal vitals (0–6h)

**Rationale:** Summarize early ED vitals for severity phenotyping.


In [None]:
# Purpose: Build ED vitals features robustly without a single large BigQuery join that can stall.

# ED vitals long + aggregates (0-6h)

# Build the ED stay map from in-memory cohort tables to keep query scope tight and deterministic.
if "ed_df" not in globals():
    raise NameError("ed_df is required before ED vitals extraction")

required_cols = ["ed_stay_id", "hadm_id", "ed_intime"]
missing_required = [c for c in required_cols if c not in ed_df.columns]
if missing_required:
    raise KeyError(f"ed_df missing required columns for ED vitals extraction: {missing_required}")

edmap_local = (
    ed_df[required_cols]
    .dropna(subset=["ed_stay_id"])
    .drop_duplicates(subset=["ed_stay_id"])
    .copy()
)
edmap_local["ed_stay_id"] = pd.to_numeric(edmap_local["ed_stay_id"], errors="coerce")
edmap_local = edmap_local.dropna(subset=["ed_stay_id"])
edmap_local["ed_stay_id"] = edmap_local["ed_stay_id"].astype(int)

stay_ids = sorted(edmap_local["ed_stay_id"].unique().tolist())
print("ED stays for vitals pull:", len(stay_ids))

SQL["ed_vitals_sql"] = f"""
SELECT
  stay_id AS ed_stay_id,
  charttime,
  temperature,
  heartrate,
  resprate,
  o2sat,
  sbp,
  dbp,
  rhythm,
  pain
FROM `{PHYS}.{ED}.vitalsign`
WHERE stay_id IN UNNEST(@stay_ids)
"""

# Query in chunks so we avoid one very large parameter payload/job.
chunk_size = 5000
vitals_chunks = []
for i in range(0, len(stay_ids), chunk_size):
    chunk = stay_ids[i:i + chunk_size]
    part = run_sql_bq(sql("ed_vitals_sql"), {"stay_ids": chunk})
    if len(part) > 0:
        part["ed_stay_id"] = pd.to_numeric(part["ed_stay_id"], errors="coerce")
        part = part.dropna(subset=["ed_stay_id"])
        part["ed_stay_id"] = part["ed_stay_id"].astype(int)
        vitals_chunks.append(part)
    print(f"ED vitals chunk {i // chunk_size + 1}: rows={len(part)}")

if vitals_chunks:
    ed_vitals_long = pd.concat(vitals_chunks, ignore_index=True)
else:
    ed_vitals_long = pd.DataFrame(
        columns=[
            "ed_stay_id", "charttime", "temperature", "heartrate", "resprate",
            "o2sat", "sbp", "dbp", "rhythm", "pain"
        ]
    )

ed_vitals_long = ed_vitals_long.merge(edmap_local, on="ed_stay_id", how="inner")
print("ED vitals long rows:", len(ed_vitals_long))

# Window filter: 0-6h from ED intime
ed_vitals_long["dt_hours"] = (ed_vitals_long["charttime"] - ed_vitals_long["ed_intime"]).dt.total_seconds() / 3600.0
in_6h = ed_vitals_long["dt_hours"].between(0, 6, inclusive="both")

agg = (
    ed_vitals_long.loc[in_6h]
    .groupby("ed_stay_id", as_index=False)
    .agg(
        max_heartrate_0_6h=("heartrate", "max"),
        max_resprate_0_6h=("resprate", "max"),
        min_o2sat_0_6h=("o2sat", "min"),
        min_sbp_0_6h=("sbp", "min"),
        n_vitals_0_6h=("charttime", "count"),
    )
)

ed_df = ed_df.merge(agg, on="ed_stay_id", how="left")

# Range warnings (report only, do not drop)
range_checks = {
    "heartrate": (0, 300),
    "resprate": (0, 80),
    "o2sat": (0, 100),
    "sbp": (0, 300),
}
for col, (lo, hi) in range_checks.items():
    bad = ed_vitals_long[col].notna() & (~ed_vitals_long[col].between(lo, hi))
    if bad.any():
        print(f"Warning: {col} out of range count:", int(bad.sum()))



## Phase 5 — Robust lab discovery + gas panels

**Rationale:** Capture blood gas and key chemistry labs with label-robust item discovery and unit normalization.


In [None]:
# Purpose: Discover lab item IDs robustly and record cohort-aware item frequency for auditability.

import re
import json

# Discover itemids from d_labitems
SQL["labitems_sql"] = f"""
SELECT itemid, label, fluid, category
FROM `{PHYS}.{HOSP}.d_labitems`
"""

labitems = run_sql_bq(sql("labitems_sql"))

patterns = {
    "gas_pco2": re.compile(r"\bp\s*co2\b|pco2|pco₂", re.I),
    "gas_ph": re.compile(r"\bph\b", re.I),
    "gas_hco3": re.compile(r"hco3|bicarbonate", re.I),
    "gas_lactate": re.compile(r"lactate", re.I),
    "gas_specimen": re.compile(r"specimen|source|type", re.I),
    "chem_creatinine": re.compile(r"creatinine", re.I),
    "chem_sodium": re.compile(r"\bsodium\b", re.I),
    "chem_chloride": re.compile(r"\bchloride\b", re.I),
    "chem_total_co2": re.compile(r"carbon dioxide|total co2|\bco2\b", re.I),
    "cbc_hemoglobin": re.compile(r"hemoglobin", re.I),
}

# Category filters limit false matches (for example chemistry CO2 vs blood-gas pCO2)
cat_gas = re.compile(r"blood\s*gas|blood gas|arterial|venous", re.I)
cat_chem = re.compile(r"chemistry|chem|blood", re.I)
cat_cbc = re.compile(r"hematology|cbc", re.I)

matches = {}
for name, pat in patterns.items():
    dfm = labitems.copy()
    dfm = dfm[dfm["label"].str.contains(pat, na=False)]
    if name.startswith("gas_"):
        dfm = dfm[dfm["category"].str.contains(cat_gas, na=False)]
    elif name.startswith("chem_"):
        dfm = dfm[dfm["category"].str.contains(cat_chem, na=False)]
    elif name.startswith("cbc_"):
        dfm = dfm[dfm["category"].str.contains(cat_cbc, na=False)]
    matches[name] = dfm[["itemid", "label", "category"]]

# Build lab_item_map with cohort counts. Run hadm counts in chunks to avoid one oversized query.
itemids_all = sorted({int(i) for dfm in matches.values() for i in dfm["itemid"].tolist()})

SQL["counts_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x)
SELECT itemid, COUNT(*) AS n
FROM `{PHYS}.{HOSP}.labevents`
WHERE hadm_id IN (SELECT hadm_id FROM hadms)
  AND itemid IN UNNEST(@itemids)
GROUP BY itemid
"""

counts_cols = ["itemid", "n"]
if itemids_all and len(hadm_list) > 0:
    hadm_chunk_size = 10000
    count_parts = []
    for i in range(0, len(hadm_list), hadm_chunk_size):
        hadm_chunk = hadm_list[i:i + hadm_chunk_size]
        part = run_sql_bq(sql("counts_sql"), {"hadms": hadm_chunk, "itemids": itemids_all})
        if len(part) > 0:
            count_parts.append(part)
        print(f"Lab item count chunk {i // hadm_chunk_size + 1}: rows={len(part)}")

    if count_parts:
        counts = (
            pd.concat(count_parts, ignore_index=True)
            .groupby("itemid", as_index=False)["n"]
            .sum()
        )
    else:
        counts = pd.DataFrame(columns=counts_cols)
else:
    counts = pd.DataFrame(columns=counts_cols)

lab_item_map = {}
for name, dfm in matches.items():
    tmp = dfm.merge(counts, on="itemid", how="left").fillna({"n": 0})
    lab_item_map[name] = {
        "pattern": patterns[name].pattern,
        "items": tmp.sort_values("n", ascending=False).to_dict(orient="records"),
    }

lab_item_map_path = WORK_DIR / "lab_item_map.json"
lab_item_map_path.write_text(json.dumps(lab_item_map, indent=2))
print("Wrote:", lab_item_map_path)



In [None]:
# Purpose: Extract cohort labs and apply ED-time windows in pandas for predictable runtime.

# Extract labevents and apply ED 0-24h windows locally

# Assemble itemid lists
itemid_sets = {k: [int(x["itemid"]) for x in v["items"]] for k, v in lab_item_map.items()}
all_itemids = sorted({i for v in itemid_sets.values() for i in v})

if "ed_df" not in globals():
    raise NameError("ed_df is required before lab window extraction")

ed_windows = (
    ed_df[["ed_stay_id", "hadm_id", "ed_intime"]]
    .dropna(subset=["ed_stay_id", "hadm_id", "ed_intime"])
    .drop_duplicates()
    .copy()
)
ed_windows["hadm_id"] = pd.to_numeric(ed_windows["hadm_id"], errors="coerce").astype("Int64")
ed_windows = ed_windows.dropna(subset=["hadm_id"])
ed_windows["hadm_id"] = ed_windows["hadm_id"].astype(int)

SQL["labs_sql"] = f"""
SELECT
  subject_id,
  hadm_id,
  itemid,
  charttime,
  specimen_id,
  valuenum,
  valueuom
FROM `{PHYS}.{HOSP}.labevents`
WHERE hadm_id IN UNNEST(@hadms)
  AND itemid IN UNNEST(@itemids)
"""

# Run in hadm chunks; merge to ED windows in pandas to keep each query lightweight.
labs_parts = []
if all_itemids and len(hadm_list) > 0:
    hadm_chunk_size = 5000
    for i in range(0, len(hadm_list), hadm_chunk_size):
        hadm_chunk = hadm_list[i:i + hadm_chunk_size]
        part = run_sql_bq(sql("labs_sql"), {"hadms": hadm_chunk, "itemids": all_itemids})
        raw_n = len(part)

        if raw_n > 0:
            part["hadm_id"] = pd.to_numeric(part["hadm_id"], errors="coerce")
            part = part.dropna(subset=["hadm_id"])
            part["hadm_id"] = part["hadm_id"].astype(int)

            part = part.merge(ed_windows, on="hadm_id", how="inner")
            in_window = (
                (part["charttime"] >= part["ed_intime"]) &
                (part["charttime"] <= (part["ed_intime"] + pd.Timedelta(hours=24)))
            )
            part = part.loc[in_window, [
                "ed_stay_id", "subject_id", "hadm_id", "itemid", "charttime", "specimen_id", "valuenum", "valueuom"
            ]]
            if len(part) > 0:
                labs_parts.append(part)

        print(f"Labs window chunk {i // hadm_chunk_size + 1}: raw={raw_n}, kept={len(part) if raw_n > 0 else 0}")

if labs_parts:
    labs_long = pd.concat(labs_parts, ignore_index=True)
else:
    labs_long = pd.DataFrame(
        columns=["ed_stay_id", "subject_id", "hadm_id", "itemid", "charttime", "specimen_id", "valuenum", "valueuom"]
    )

print("Labs long rows:", len(labs_long))

# Unit audit for pCO2
pco2_ids = itemid_sets.get("gas_pco2", [])
unit_audit = (
    labs_long.loc[labs_long["itemid"].isin(pco2_ids)]
    .groupby("valueuom", dropna=False)
    .size().reset_index(name="n")
)
unit_audit_path = WORK_DIR / "lab_unit_audit.csv"
unit_audit.to_csv(unit_audit_path, index=False)
print("Wrote:", unit_audit_path)



In [None]:
# Purpose: Reconstruct gas panels and normalize units before ED-stay level aggregation.

# Reconstruct gas panels by specimen_id within ED stay

pco2_ids = set(itemid_sets.get("gas_pco2", []))
ph_ids = set(itemid_sets.get("gas_ph", []))
hco3_ids = set(itemid_sets.get("gas_hco3", []))
lact_ids = set(itemid_sets.get("gas_lactate", []))

labs = labs_long.copy()

# Convert pCO2 kPa to mmHg when needed
is_kpa = labs["valueuom"].astype(str).str.lower().str.contains("kpa", na=False)
mask_pco2 = labs["itemid"].isin(pco2_ids)
if mask_pco2.any() and is_kpa.any():
    labs.loc[mask_pco2 & is_kpa, "valuenum"] = labs.loc[mask_pco2 & is_kpa, "valuenum"] * 7.50062
    labs.loc[mask_pco2 & is_kpa, "valueuom"] = "mmHg"

# panel by specimen_id
panel = (
    labs.groupby(["ed_stay_id","specimen_id"], as_index=False)
    .agg(panel_time=("charttime","min"))
)

# attach analytes

def pick_analyte(df, ids, name):
    tmp = df.loc[df["itemid"].isin(ids), ["ed_stay_id","specimen_id","valuenum"]]
    tmp = tmp.rename(columns={"valuenum": name})
    return tmp.groupby(["ed_stay_id","specimen_id"], as_index=False).first()

if pco2_ids:
    panel = panel.merge(pick_analyte(labs, pco2_ids, "pco2"), on=["ed_stay_id","specimen_id"], how="left")
if ph_ids:
    panel = panel.merge(pick_analyte(labs, ph_ids, "ph"), on=["ed_stay_id","specimen_id"], how="left")
if hco3_ids:
    panel = panel.merge(pick_analyte(labs, hco3_ids, "hco3"), on=["ed_stay_id","specimen_id"], how="left")
if lact_ids:
    panel = panel.merge(pick_analyte(labs, lact_ids, "lactate"), on=["ed_stay_id","specimen_id"], how="left")

# First panel per ED stay
first_panel = (
    panel.sort_values(["ed_stay_id","panel_time"]).groupby("ed_stay_id", as_index=False).first()
)

# 0–6h and 0–24h extrema
panel = panel.merge(ed_df[["ed_stay_id","ed_intime"]], on="ed_stay_id", how="left")
panel["dt_hours"] = (panel["panel_time"] - panel["ed_intime"]).dt.total_seconds() / 3600.0

# Ensure expected panel columns exist even if analyte is absent
for col in ["pco2", "ph", "hco3", "lactate"]:
    if col not in panel.columns:
        panel[col] = pd.NA

p06 = panel.loc[panel["dt_hours"].between(0,6, inclusive="both")]
p24 = panel.loc[panel["dt_hours"].between(0,24, inclusive="both")]

agg06 = p06.groupby("ed_stay_id", as_index=False).agg(max_pco2_0_6h=("pco2","max"), min_ph_0_6h=("ph","min"))
agg24 = p24.groupby("ed_stay_id", as_index=False).agg(max_pco2_0_24h=("pco2","max"), min_ph_0_24h=("ph","min"))

ed_df = ed_df.merge(first_panel[["ed_stay_id","panel_time","pco2","ph","hco3","lactate"]].rename(
    columns={"panel_time":"first_gas_time","pco2":"first_pco2","ph":"first_ph","hco3":"first_hco3","lactate":"first_lactate"}
), on="ed_stay_id", how="left")

ed_df = ed_df.merge(agg06, on="ed_stay_id", how="left")
ed_df = ed_df.merge(agg24, on="ed_stay_id", how="left")




In [None]:
# Purpose: Reconstruct gas panels and normalize units before ED-stay level aggregation.

# ABG vs VBG classification (specimen- and label-based inference)

# Use label heuristics from d_labitems if available (arterial/venous hints)
labitems_map = (
    labitems.set_index("itemid")[["label", "fluid"]]
    .fillna("")
    .to_dict("index")
)

def _infer_source_text(text: str) -> str | None:
    if re.search(r"(arterial|abg|a[- ]?line|\bart\b)", text):
        return "arterial"
    if re.search(r"(venous|vbg|central|mixed|\bven\b)", text):
        return "venous"
    return None


def infer_source_item(itemid):
    meta = labitems_map.get(itemid, {})
    text = f"{meta.get('label','')} {meta.get('fluid','')}".lower()
    return _infer_source_text(text)

# Infer source per specimen_id using ANY item label within the specimen
spec_source = None
if "specimen_id" in labs.columns and labs["specimen_id"].notna().any():
    spec_items = labs.loc[labs["specimen_id"].notna(), ["ed_stay_id", "specimen_id", "itemid"]].drop_duplicates()
    spec_items["source_hint"] = spec_items["itemid"].map(infer_source_item)

    def _resolve_source(s):
        s = s.dropna()
        if (s == "arterial").any():
            return "arterial"
        if (s == "venous").any():
            return "venous"
        return "other"

    spec_source = (
        spec_items.groupby(["ed_stay_id", "specimen_id"], as_index=False)["source_hint"]
        .apply(_resolve_source)
        .rename(columns={"source_hint": "source"})
    )

if spec_source is not None:
    panel = panel.merge(spec_source, on=["ed_stay_id", "specimen_id"], how="left")

# Fallback: assign source based on pco2 item label
if pco2_ids:
    pco2_itemid = (
        labs.loc[labs["itemid"].isin(pco2_ids), ["ed_stay_id", "specimen_id", "itemid"]]
        .groupby(["ed_stay_id", "specimen_id"], as_index=False)["itemid"].first()
    )
    pco2_itemid["source_pco2"] = pco2_itemid["itemid"].map(infer_source_item)
    panel = panel.merge(pco2_itemid[["ed_stay_id", "specimen_id", "source_pco2"]], on=["ed_stay_id", "specimen_id"], how="left")

    if "source" in panel.columns:
        panel["source"] = panel["source"].where(panel["source"] != "other", pd.NA)
        panel["source"] = panel["source"].fillna(panel["source_pco2"])
    else:
        panel["source"] = panel["source_pco2"]
    panel = panel.drop(columns=["source_pco2"])
else:
    if "source" not in panel.columns:
        panel["source"] = "other"

panel["source"] = panel["source"].fillna("other")

# flags
panel["flag_abg_hypercapnia"] = ((panel["source"]=="arterial") & (panel["pco2"]>=45)).astype(int)
panel["flag_vbg_hypercapnia"] = ((panel["source"]=="venous") & (panel["pco2"]>=50)).astype(int)
panel["flag_other_hypercapnia"] = ((panel["source"]=="other") & (panel["pco2"]>=50)).astype(int)
panel["flag_any_gas_hypercapnia"] = ((panel["flag_abg_hypercapnia"]==1) | (panel["flag_vbg_hypercapnia"]==1) | (panel["flag_other_hypercapnia"]==1)).astype(int)

# per-stay unknown source rate
if len(panel) > 0:
    unk_rate = (
        panel.assign(_unk=(panel["source"].fillna("other") == "other"))
             .groupby("ed_stay_id", as_index=False)["_unk"].mean()
             .rename(columns={"_unk": "gas_source_other_rate"})
    )
else:
    unk_rate = panel[["ed_stay_id"]].drop_duplicates()
    unk_rate["gas_source_other_rate"] = 1.0

# collapse to ED stay flags
flags = panel.groupby("ed_stay_id", as_index=False).agg(
    flag_abg_hypercapnia=("flag_abg_hypercapnia","max"),
    flag_vbg_hypercapnia=("flag_vbg_hypercapnia","max"),
    flag_other_hypercapnia=("flag_other_hypercapnia","max"),
    flag_any_gas_hypercapnia=("flag_any_gas_hypercapnia","max"),
)

ed_df = ed_df.merge(flags, on="ed_stay_id", how="left")
ed_df = ed_df.merge(unk_rate, on="ed_stay_id", how="left")
ed_df["gas_source_other_rate"] = ed_df["gas_source_other_rate"].fillna(1.0)


## Phase 5C — ICU POC blood gases (chartevents, optional)

**Rationale:** Capture ICU point-of-care gases if central lab labevents miss early measurements.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# Discover ICU POC itemids from d_items

SQL["ditems_sql"] = f"""
SELECT itemid, label, category
FROM `{PHYS}.{ICU}.d_items`
"""

ditems = run_sql_bq(sql("ditems_sql"))

icu_patterns = {
    "pco2": re.compile(r"\bp\s*co2\b|pco2|pco₂", re.I),
    "ph": re.compile(r"\bph\b", re.I),
    "hco3": re.compile(r"hco3|bicarbonate", re.I),
    "lactate": re.compile(r"lactate", re.I),
    "specimen": re.compile(r"specimen|source|type", re.I),
}

icu_cat = re.compile(r"blood\s*gas|blood gas|resp|arterial|venous", re.I)

icu_matches = {}
for name, pat in icu_patterns.items():
    dfm = ditems[ditems["label"].str.contains(pat, na=False)]
    dfm = dfm[dfm["category"].str.contains(icu_cat, na=False)]
    icu_matches[name] = dfm[["itemid","label","category"]]
# Exclude non-blood pCO2 measurements from ICU candidates
if "pco2" in icu_matches and len(icu_matches["pco2"]) > 0:
    icu_matches["pco2"] = icu_matches["pco2"][~icu_matches["pco2"]["label"].str.contains(r"co2\s*(prod|production|elimin|elimination)|\bvco2\b", case=False, na=False)]

icu_itemids = sorted({int(i) for dfm in icu_matches.values() for i in dfm["itemid"].tolist()})
print("ICU POC candidate itemids:", len(icu_itemids))



In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# Extract ICU chartevents within ED 0–24h window for cohort ICU stays

if len(icu_itemids) == 0:
    icu_poc_long = pd.DataFrame()
    print("No ICU POC itemids found.")
else:
    # ensure icu stay ids available
    if 'icu' not in globals():
        raise NameError("ICU stays table 'icu' not found; run ICU phase first.")

    icu_stays = icu[["icu_stay_id","hadm_id","intime"]].copy()
    icu_stays = icu_stays.dropna(subset=["icu_stay_id"])

    icu_poc_sql = f"""
    WITH icu_stays AS (
      SELECT stay_id AS icu_stay_id, hadm_id
      FROM `{PHYS}.{ICU}.icustays`
      WHERE stay_id IN UNNEST(@icu_stay_ids)
    ),
    eds AS (
      SELECT stay_id AS ed_stay_id, hadm_id, intime AS ed_intime
      FROM `{PHYS}.{ED}.edstays`
      WHERE hadm_id IN (SELECT hadm_id FROM icu_stays)
    )
    SELECT
      s.icu_stay_id,
      e.ed_stay_id,
      e.hadm_id,
      ce.charttime,
      ce.itemid,
      ce.valuenum,
      ce.valueuom
    FROM `{PHYS}.{ICU}.chartevents` ce
    JOIN icu_stays s ON s.icu_stay_id = ce.stay_id
    JOIN eds e ON e.hadm_id = s.hadm_id
    WHERE ce.itemid IN UNNEST(@itemids)
      AND ce.charttime BETWEEN e.ed_intime AND TIMESTAMP_ADD(e.ed_intime, INTERVAL 24 HOUR)
    """

    icu_poc_long = run_sql_bq(icu_poc_sql, {"icu_stay_ids": icu_stays["icu_stay_id"].astype(int).tolist(), "itemids": icu_itemids})
    print("ICU POC long rows:", len(icu_poc_long))

# Build panels by 5-minute bins per ICU stay
if len(icu_poc_long) > 0:
    icu_poc_long["time_bin"] = icu_poc_long["charttime"].dt.floor("5min")

    def pick_from_ids(df, ids, name):
        tmp = df.loc[df["itemid"].isin(ids), ["icu_stay_id","time_bin","valuenum"]]
        tmp = tmp.rename(columns={"valuenum": name})
        return tmp.groupby(["icu_stay_id","time_bin"], as_index=False).first()

    pco2_ids = set(icu_matches.get("pco2", pd.DataFrame()).get("itemid", []).tolist())
    ph_ids = set(icu_matches.get("ph", pd.DataFrame()).get("itemid", []).tolist())
    hco3_ids = set(icu_matches.get("hco3", pd.DataFrame()).get("itemid", []).tolist())
    lact_ids = set(icu_matches.get("lactate", pd.DataFrame()).get("itemid", []).tolist())

    panel_poc = (
        icu_poc_long.groupby(["icu_stay_id","time_bin"], as_index=False)
        .agg(panel_time=("charttime","min"))
    )
    if pco2_ids:
        panel_poc = panel_poc.merge(pick_from_ids(icu_poc_long, pco2_ids, "pco2"), on=["icu_stay_id","time_bin"], how="left")
    if ph_ids:
        panel_poc = panel_poc.merge(pick_from_ids(icu_poc_long, ph_ids, "ph"), on=["icu_stay_id","time_bin"], how="left")
    if hco3_ids:
        panel_poc = panel_poc.merge(pick_from_ids(icu_poc_long, hco3_ids, "hco3"), on=["icu_stay_id","time_bin"], how="left")
    if lact_ids:
        panel_poc = panel_poc.merge(pick_from_ids(icu_poc_long, lact_ids, "lactate"), on=["icu_stay_id","time_bin"], how="left")

    # map to ED stay via hadm_id
    panel_poc = panel_poc.merge(icu[["icu_stay_id","hadm_id"]], on="icu_stay_id", how="left")
    panel_poc = panel_poc.merge(ed_df[["ed_stay_id","hadm_id","ed_intime"]], on="hadm_id", how="left")
    panel_poc["dt_hours"] = (panel_poc["panel_time"] - panel_poc["ed_intime"]).dt.total_seconds() / 3600.0
else:
    panel_poc = pd.DataFrame()

# Ensure expected panel_poc columns exist even if analyte is absent
for col in ["pco2", "ph", "hco3", "lactate"]:
    if col not in panel_poc.columns:
        panel_poc[col] = pd.NA

poc_flags = pd.DataFrame(columns=["ed_stay_id", "flag_any_gas_hypercapnia_poc"])
if len(panel_poc) > 0:
    p24_poc = panel_poc.loc[panel_poc["dt_hours"].between(0,24, inclusive="both")].copy()
    p24_poc.loc[:, "flag_any_gas_hypercapnia_poc"] = (p24_poc["pco2"] >= 50).astype("Int64")

    poc_flags = p24_poc.groupby("ed_stay_id", as_index=False).agg(
        flag_any_gas_hypercapnia_poc=("flag_any_gas_hypercapnia_poc","max")
    )

# Avoid duplicate column on re-run
if "flag_any_gas_hypercapnia_poc" in ed_df.columns:
    ed_df = ed_df.drop(columns=["flag_any_gas_hypercapnia_poc"])

if len(poc_flags) > 0:
    ed_df = ed_df.merge(poc_flags, on="ed_stay_id", how="left")

# incremental yield
base = ed_df.get("flag_any_gas_hypercapnia", pd.Series(0, index=ed_df.index)).fillna(0).astype(int)
poc = ed_df.get("flag_any_gas_hypercapnia_poc", pd.Series(0, index=ed_df.index)).fillna(0).astype(int)
inc = ((base == 0) & (poc == 1)).sum()
print("ICU POC incremental hypercapnia cases (ED stays):", int(inc))

# optional export
if len(panel_poc) > 0:
    from datetime import datetime

    prior_runs_dir = DATA_DIR / "prior runs"
    prior_runs_dir.mkdir(parents=True, exist_ok=True)
    poc_path = prior_runs_dir / f"gas_panels_poc_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
    panel_poc.to_parquet(poc_path, index=False)
    print("Wrote:", poc_path)


## Phase 6 — BMI/anthropometrics (OMR)

**Rationale:** Add BMI/height/weight closest to ED presentation when available.


In [None]:
# Purpose: Define a reusable BigQuery execution helper so SQL calls are consistent across notebook stages.

# OMR BMI/height/weight

try:
    omr_sql = f"""
    SELECT subject_id, chartdate, result_name, result_value
    FROM `{PHYS}.{HOSP}.omr`
    WHERE LOWER(result_name) IN ('bmi','height','weight')
    """
    omr = run_sql_bq(omr_sql)
    print("OMR rows:", len(omr))

    if len(omr) == 0:
        print("OMR query returned 0 rows; BMI/height/weight will remain missing.")
    else:
        omr["result_name"] = omr["result_name"].str.lower()
        omr["result_value_num"] = (
            omr["result_value"].astype(str)
            .str.extract(r"(-?\d+(?:\.\d+)?)", expand=False)
        )
        omr["result_value_num"] = pd.to_numeric(omr["result_value_num"], errors="coerce")

        omr_pivot = (
            omr.pivot_table(
                index=["subject_id","chartdate"],
                columns="result_name",
                values="result_value_num",
                aggfunc="first",
            )
            .reset_index()
        )

        # attach closest pre-ED (within 365 days)
        ed_dates = ed_df[["ed_stay_id","subject_id","ed_intime"]].copy()
        ed_dates["ed_date"] = ed_dates["ed_intime"].dt.date
        omr_pivot["chartdate"] = pd.to_datetime(omr_pivot["chartdate"]).dt.date

        merged = ed_dates.merge(omr_pivot, on="subject_id", how="left")
        merged["days_before"] = (pd.to_datetime(merged["ed_date"]) - pd.to_datetime(merged["chartdate"])).dt.days
        merged = merged.loc[(merged["days_before"] >= 0) & (merged["days_before"] <= 365)]

        closest = (
            merged.sort_values(["ed_stay_id","days_before"]).groupby("ed_stay_id", as_index=False).first()
            .rename(columns={
                "bmi":"bmi_closest_pre_ed",
                "height":"height_closest_pre_ed",
                "weight":"weight_closest_pre_ed",
            })
        )

        ed_df = ed_df.merge(
            closest[["ed_stay_id","bmi_closest_pre_ed","height_closest_pre_ed","weight_closest_pre_ed"]],
            on="ed_stay_id", how="left"
        )
except Exception as e:
    print("OMR not available or failed:", e)



## Phase 7 — ICD comorbidity flags

**Rationale:** Derive comorbidity indicators from index admission ICD codes for stratified analyses.


In [None]:
# Purpose: Compute comorbidity flags from hospital + ED ICD codes with chunked hadm queries.

# ICD code pulls for comorbidity flags (hospital + ED; combined OR)
# NOTE: Use prefix filters to reduce CPU and avoid regex-heavy scans.

FLAGS = [
    "flag_copd", "flag_osa_ohs", "flag_chf", "flag_neuromuscular",
    "flag_opioid_substance", "flag_pneumonia",
]

SQL["icd_flags_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x),
icd AS (
  SELECT
    hadm_id,
    UPPER(REPLACE(icd_code, ".", "")) AS code_norm
  FROM `{PHYS}.{HOSP}.diagnoses_icd`
  WHERE hadm_id IN (SELECT hadm_id FROM hadms)
    AND (
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J43") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J44") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "G473") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "E662") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "I50") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "G12") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "G70") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "G71") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "F11") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "T40") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "F13") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J12") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J13") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J14") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J15") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J16") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J17") OR
      STARTS_WITH(UPPER(REPLACE(icd_code, ".", "")), "J18")
    )
)
SELECT
  hadm_id,
  MAX(IF(STARTS_WITH(code_norm, "J43") OR STARTS_WITH(code_norm, "J44"), 1, 0)) AS flag_copd,
  MAX(IF(STARTS_WITH(code_norm, "G473") OR STARTS_WITH(code_norm, "E662"), 1, 0)) AS flag_osa_ohs,
  MAX(IF(STARTS_WITH(code_norm, "I50"), 1, 0)) AS flag_chf,
  MAX(IF(STARTS_WITH(code_norm, "G12") OR STARTS_WITH(code_norm, "G70") OR STARTS_WITH(code_norm, "G71"), 1, 0)) AS flag_neuromuscular,
  MAX(IF(STARTS_WITH(code_norm, "F11") OR STARTS_WITH(code_norm, "T40") OR STARTS_WITH(code_norm, "F13"), 1, 0)) AS flag_opioid_substance,
  MAX(IF(STARTS_WITH(code_norm, "J12") OR STARTS_WITH(code_norm, "J13") OR STARTS_WITH(code_norm, "J14") OR STARTS_WITH(code_norm, "J15") OR STARTS_WITH(code_norm, "J16") OR STARTS_WITH(code_norm, "J17") OR STARTS_WITH(code_norm, "J18"), 1, 0)) AS flag_pneumonia
FROM icd
GROUP BY hadm_id
"""

SQL["ed_icd_flags_sql"] = f"""
WITH hadms AS (SELECT x AS hadm_id FROM UNNEST(@hadms) AS x),
ed_dx AS (
  SELECT
    s.hadm_id,
    UPPER(REPLACE(d.icd_code, ".", "")) AS code_norm
  FROM `{PHYS}.{ED}.diagnosis` d
  JOIN `{PHYS}.{ED}.edstays` s
    ON s.stay_id = d.stay_id
  WHERE s.hadm_id IN (SELECT hadm_id FROM hadms)
    AND (
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J43") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J44") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "G473") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "E662") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "I50") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "G12") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "G70") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "G71") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "F11") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "T40") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "F13") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J12") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J13") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J14") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J15") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J16") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J17") OR
      STARTS_WITH(UPPER(REPLACE(d.icd_code, ".", "")), "J18")
    )
)
SELECT
  hadm_id,
  MAX(IF(STARTS_WITH(code_norm, "J43") OR STARTS_WITH(code_norm, "J44"), 1, 0)) AS flag_copd,
  MAX(IF(STARTS_WITH(code_norm, "G473") OR STARTS_WITH(code_norm, "E662"), 1, 0)) AS flag_osa_ohs,
  MAX(IF(STARTS_WITH(code_norm, "I50"), 1, 0)) AS flag_chf,
  MAX(IF(STARTS_WITH(code_norm, "G12") OR STARTS_WITH(code_norm, "G70") OR STARTS_WITH(code_norm, "G71"), 1, 0)) AS flag_neuromuscular,
  MAX(IF(STARTS_WITH(code_norm, "F11") OR STARTS_WITH(code_norm, "T40") OR STARTS_WITH(code_norm, "F13"), 1, 0)) AS flag_opioid_substance,
  MAX(IF(STARTS_WITH(code_norm, "J12") OR STARTS_WITH(code_norm, "J13") OR STARTS_WITH(code_norm, "J14") OR STARTS_WITH(code_norm, "J15") OR STARTS_WITH(code_norm, "J16") OR STARTS_WITH(code_norm, "J17") OR STARTS_WITH(code_norm, "J18"), 1, 0)) AS flag_pneumonia
FROM ed_dx
GROUP BY hadm_id
"""

def _run_flag_query_chunked(sql_name: str, label: str) -> pd.DataFrame:
    parts = []
    hadm_chunk_size = 10000
    for i in range(0, len(hadm_list), hadm_chunk_size):
        hadm_chunk = hadm_list[i:i + hadm_chunk_size]
        part = run_sql_bq(sql(sql_name), {"hadms": hadm_chunk})
        if len(part) > 0:
            parts.append(part)
        print(f"{label} chunk {i // hadm_chunk_size + 1}: rows={len(part)}")

    if parts:
        out = pd.concat(parts, ignore_index=True)
        out = out.groupby("hadm_id", as_index=False)[FLAGS].max()
        return out

    return pd.DataFrame(columns=["hadm_id"] + FLAGS)

flag_hosp = _run_flag_query_chunked("icd_flags_sql", "Hosp ICD flags")
flag_ed = _run_flag_query_chunked("ed_icd_flags_sql", "ED ICD flags")

flag_hosp = flag_hosp.rename(columns={k: f"{k}_hosp" for k in FLAGS})
flag_ed = flag_ed.rename(columns={k: f"{k}_ed" for k in FLAGS})

flag_df = flag_hosp.merge(flag_ed, on="hadm_id", how="outer")
for k in FLAGS:
    hosp_col = f"{k}_hosp"
    ed_col = f"{k}_ed"
    if hosp_col not in flag_df.columns:
        flag_df[hosp_col] = 0
    if ed_col not in flag_df.columns:
        flag_df[ed_col] = 0
    flag_df[k] = ((flag_df[hosp_col].fillna(0).astype(int) == 1) | (flag_df[ed_col].fillna(0).astype(int) == 1)).astype(int)

# Merge into admission-level and ED-stay-level frames (override if present)
for _df_name in ["df", "ed_df"]:
    if _df_name in globals():
        _df = globals()[_df_name]
        drop_cols = [c for c in flag_df.columns if c != "hadm_id" and c in _df.columns]
        if drop_cols:
            _df = _df.drop(columns=drop_cols)
        _df = _df.merge(flag_df, on="hadm_id", how="left")
        globals()[_df_name] = _df

# Prevalence (combined flags)
for k in FLAGS:
    if k in ed_df.columns:
        print(k, int(ed_df[k].fillna(0).sum()))



## Phase 8 — Timing phenotypes and derived bands

**Rationale:** Compute time-anchored hypercapnia/acidemia phenotypes for ED presentation vs later course.


In [None]:
# Purpose: Create time-anchored phenotypes and severity bands relative to first ED presentation.

# Timing phenotypes

anchor = "ed_intime_first" if "ed_intime_first" in ed_df.columns else "ed_intime"

ed_df["dt_first_qualifying_gas_hours"] = (ed_df["first_gas_time"] - ed_df[anchor]).dt.total_seconds() / 3600.0
ed_df["presenting_hypercapnia"] = (ed_df["dt_first_qualifying_gas_hours"] <= 6).astype("Int64")
ed_df["late_hypercapnia"] = (ed_df["dt_first_qualifying_gas_hours"] > 6).astype("Int64")

# NIV/IMV timing relative to first ED presentation
if "first_imv_time" in ed_df.columns:
    ed_df["dt_first_imv_hours"] = (ed_df["first_imv_time"] - ed_df[anchor]).dt.total_seconds() / 3600.0
if "first_niv_time" in ed_df.columns:
    ed_df["dt_first_niv_hours"] = (ed_df["first_niv_time"] - ed_df[anchor]).dt.total_seconds() / 3600.0

# Bands
bins_ph = [-1, 7.20, 7.30, 7.35, 99]
labels_ph = ["<7.20","7.20–7.29","7.30–7.34","≥7.35"]
ed_df["ph_band"] = pd.cut(ed_df["first_ph"], bins=bins_ph, labels=labels_ph)

bins_hco3 = [-1, 24, 30, 999]
labels_hco3 = ["<24","24–29","≥30"]
ed_df["hco3_band"] = pd.cut(ed_df["first_hco3"], bins=bins_hco3, labels=labels_hco3)

bins_lac = [-1, 2, 4, 999]
labels_lac = ["<2","2–4",">4"]
ed_df["lactate_band"] = pd.cut(ed_df["first_lactate"], bins=bins_lac, labels=labels_lac)



# Align ED-stay gas source flags to hadm-level thresholds and ICU POC signal.
# This keeps source-specific flags internally consistent in the final ED-stay export.
if "df" in globals() and "hadm_id" in ed_df.columns and "hadm_id" in df.columns:
    hadm_thr_cols = [c for c in ["abg_hypercap_threshold", "vbg_hypercap_threshold", "other_hypercap_threshold"] if c in df.columns]
    if hadm_thr_cols:
        hadm_thr = df[["hadm_id"] + hadm_thr_cols].drop_duplicates("hadm_id")
        hadm_thr_map = hadm_thr.set_index("hadm_id")

        for col in hadm_thr_cols:
            mapped = pd.to_numeric(ed_df["hadm_id"].map(hadm_thr_map[col]), errors="coerce")
            existing = pd.to_numeric(ed_df[col], errors="coerce") if col in ed_df.columns else pd.Series(np.nan, index=ed_df.index)
            ed_df[col] = existing.fillna(mapped).fillna(0).astype("Int64")

# Source-specific flags must include any matching hadm-level threshold.
flag_specs = [
    ("flag_abg_hypercapnia", "abg_hypercap_threshold"),
    ("flag_vbg_hypercapnia", "vbg_hypercap_threshold"),
    ("flag_other_hypercapnia", "other_hypercap_threshold"),
]
for flag_col, thr_col in flag_specs:
    base = pd.to_numeric(ed_df.get(flag_col, pd.Series(0, index=ed_df.index)), errors="coerce").fillna(0).astype(int)
    thr = pd.to_numeric(ed_df.get(thr_col, pd.Series(0, index=ed_df.index)), errors="coerce").fillna(0).astype(int)
    ed_df[flag_col] = ((base == 1) | (thr == 1)).astype("Int64")

any_from_sources = (
    (pd.to_numeric(ed_df.get("flag_abg_hypercapnia", 0), errors="coerce").fillna(0).astype(int) == 1)
    | (pd.to_numeric(ed_df.get("flag_vbg_hypercapnia", 0), errors="coerce").fillna(0).astype(int) == 1)
    | (pd.to_numeric(ed_df.get("flag_other_hypercapnia", 0), errors="coerce").fillna(0).astype(int) == 1)
)

poc_any = pd.to_numeric(ed_df.get("flag_any_gas_hypercapnia_poc", pd.Series(0, index=ed_df.index)), errors="coerce").fillna(0).astype(int) == 1
ed_df["flag_any_gas_hypercapnia"] = (any_from_sources | poc_any).astype("Int64")


## QA / audits

**Rationale:** Validate joins, missingness, lab completeness, and produce reproducibility artifacts.


In [None]:
# Purpose: Create reusable data-quality and merge guardrail helpers to prevent silent join errors.

# --- QA checks (lightweight, deterministic)
STRICT_QA = False

# 1) Key uniqueness
if 'ed_df' in globals() and 'ed_stay_id' in ed_df.columns:
    try:
        assert_unique(ed_df, 'ed_stay_id', 'ed_df')
        print('QA: ed_stay_id unique OK')
    except Exception as e:
        print('QA FAIL:', e)
        if STRICT_QA:
            raise

# 2) Inclusion sanity: any_hypercap_icd==0 implies gas criteria met
if 'ed_df' in globals() and 'any_hypercap_icd' in ed_df.columns:
    gas_flag = ed_df.get('flag_any_gas_hypercapnia', pd.Series(0, index=ed_df.index)).fillna(0).astype(int)
    icd_flag = ed_df['any_hypercap_icd'].fillna(0).astype(int)
    viol = (icd_flag == 0) & (gas_flag == 0)
    n_viol = int(viol.sum())
    print('QA: ICD==0 & Gas==0 count:', n_viol)
    if STRICT_QA and n_viol > 0:
        raise AssertionError('Found rows without ICD and without gas criteria.')

# 3) Temporal sanity: first_gas_time >= ed_intime (allow small negative drift)
if 'ed_df' in globals() and 'first_gas_time' in ed_df.columns and 'ed_intime' in ed_df.columns:
    dt = (pd.to_datetime(ed_df['first_gas_time']) - pd.to_datetime(ed_df['ed_intime'])).dt.total_seconds() / 3600
    n_neg = int((dt < -1).sum())  # allow 1h clock drift
    print('QA: first_gas_time < ed_intime by >1h:', n_neg)
    if STRICT_QA and n_neg > 0:
        raise AssertionError('first_gas_time before ed_intime by >1h')

# 4) Range checks (report only)
if 'ed_df' in globals():
    ranges = {
        'first_ph': (6.8, 7.8),
        'first_pco2': (10, 200),
        'first_lactate': (0, 30),
        'creatinine': (0, 20),
    }
    rc = check_ranges(ed_df, ranges)
    if not rc.empty:
        print('QA range check (n out-of-range):')
        print(rc.to_string(index=False))



In [None]:
# Purpose: Report missingness for key variables to support transparent data-quality reporting.

import json

# T1 — uniqueness and join explosion guard
if ed_df["ed_stay_id"].nunique() != len(ed_df):
    raise ValueError("ed_stay_id not unique after merges")

# T2 — missingness summary for new fields
new_fields = [c for c in TARGET_FIELDS if c in ed_df.columns]
miss = pd.DataFrame({
    "field": new_fields,
    "missing_n": [int(ed_df[c].isna().sum()) for c in new_fields],
    "missing_pct": [float(ed_df[c].isna().mean()) for c in new_fields],
})
print(miss.sort_values("missing_pct", ascending=False).head(30))

# T3 — lab capture completeness
pct_any_6h = float(p06["ed_stay_id"].nunique() / max(ed_df["ed_stay_id"].nunique(),1))
pct_any_24h = float(p24["ed_stay_id"].nunique() / max(ed_df["ed_stay_id"].nunique(),1))
print("% any gas panel 0–6h:", round(pct_any_6h*100,1))
print("% any gas panel 0–24h:", round(pct_any_24h*100,1))

# Use per-stay source-other rate if available
if "gas_source_other_rate" in ed_df.columns:
    gas_source_other_rate = float(ed_df["gas_source_other_rate"].mean())
else:
    gas_source_other_rate = 1.0
print("% source other (panel-level):", round(gas_source_other_rate*100,1))

hadm_rows = int(len(df)) if "df" in globals() else None
hadm_cc_rows = None
if "df" in globals() and "ed_triage_cc" in df.columns:
    _hadm_cc_mask = df["ed_triage_cc"].notna() & (df["ed_triage_cc"].astype(str).str.strip() != "")
    hadm_cc_rows = int(_hadm_cc_mask.sum())

# T4 — QA summary artifact
qa_summary = {
    "ed_spine_rows": int(len(ed_df)),
    "ed_rows": int(len(ed_df)),
    "ed_unique": int(ed_df["ed_stay_id"].nunique()),
    "hadm_rows": hadm_rows,
    "hadm_cc_rows": hadm_cc_rows,
    "icu_link_rate": float(ed_df["icu_intime_first"].notna().mean()) if "icu_intime_first" in ed_df.columns else None,
    "pct_any_gas_0_6h": pct_any_6h,
    "pct_any_gas_0_24h": pct_any_24h,
    "gas_source_other_rate": gas_source_other_rate,
    "source_unknown_rate": gas_source_other_rate,
    "run_metadata": RUN_METADATA if "RUN_METADATA" in globals() else None,
}
qa_path = WORK_DIR / "qa_summary.json"
qa_path.write_text(json.dumps(qa_summary, indent=2))
print("Wrote:", qa_path)



## Outputs (ED-stay cohort + long tables)

**Rationale:** Persist ED-stay analytic datasets and supporting long tables.


In [None]:
# Purpose: Write finalized cohort outputs and derivative tables for downstream analysis workflows.

# Save outputs (dated parquet artifacts archived under prior runs)
from datetime import datetime

prior_runs_dir = DATA_DIR / "prior runs"
prior_runs_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

cohort_path = prior_runs_dir / f"cohort_ed_stay_{timestamp}.parquet"
ed_vitals_path = prior_runs_dir / f"ed_vitals_long_{timestamp}.parquet"
labs_path = prior_runs_dir / f"labs_long_{timestamp}.parquet"
gas_panels_path = prior_runs_dir / f"gas_panels_{timestamp}.parquet"

ed_df.to_parquet(cohort_path, index=False)
ed_vitals_long.to_parquet(ed_vitals_path, index=False)
labs_long.to_parquet(labs_path, index=False)
panel.to_parquet(gas_panels_path, index=False)

print("Wrote:", cohort_path)
print("Wrote:", ed_vitals_path)
print("Wrote:", labs_path)
print("Wrote:", gas_panels_path)


In [None]:
# Purpose: Report missingness for key variables to support transparent data-quality reporting.

# Also export to Excel (tabular dataset) - optional archive output
from datetime import datetime

WRITE_ARCHIVE_XLSX_EXPORTS = os.getenv("WRITE_ARCHIVE_XLSX_EXPORTS", "0") == "1"
if WRITE_ARCHIVE_XLSX_EXPORTS:
    prior_runs_dir = DATA_DIR / "prior runs"
    prior_runs_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    xlsx_path = prior_runs_dir / f"mimic_hypercap_EXT_EDstay_bq_gas_{timestamp}.xlsx"

    with pd.ExcelWriter(xlsx_path, engine="openpyxl") as xw:
        ed_df.to_excel(xw, sheet_name="cohort_ed_stay", index=False)
        # Optional: include QA tables if present
        try:
            miss.to_excel(xw, sheet_name="missingness", index=False)
        except Exception:
            pass

    print("Saved:", xlsx_path)
else:
    print("Skipping ED-stay archive workbook export (set WRITE_ARCHIVE_XLSX_EXPORTS=1 to enable).")


In [None]:
# Purpose: Capture ED presentation context (chief complaint, acuity, and early vitals) at the ED-stay level.

# Final Excel outputs requested: all encounters + ED chief-complaint only
from datetime import datetime

prior_runs_dir = DATA_DIR / "prior runs"
prior_runs_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

# 1) All encounters meeting inclusion criteria (admission-level), with ED linkage flags
ed_link = (
    ed_df.groupby("hadm_id", as_index=False)
         .agg(ed_stay_id_first=("ed_stay_id", "first"), n_ed_stays=("ed_stay_id", "nunique"))
)
all_encounters = df.merge(ed_link, on="hadm_id", how="left")
all_encounters["has_ed_encounter"] = all_encounters["n_ed_stays"].fillna(0).astype(int).gt(0).astype(int)
all_encounters["encounter_source"] = all_encounters["has_ed_encounter"].map({1: "ED-linked", 0: "Inpatient-only"})

if WRITE_ARCHIVE_XLSX_EXPORTS:
    all_path = prior_runs_dir / f"mimic_hypercap_EXT_all_encounters_bq_{timestamp}.xlsx"
    with pd.ExcelWriter(all_path, engine="openpyxl") as xw:
        all_encounters.to_excel(xw, sheet_name="all_encounters", index=False)
    print("Saved:", all_path)

# 2) ED chief-complaint-only (ED-stay level)
if "ed_triage_cc" not in ed_df.columns:
    raise KeyError("Column 'ed_triage_cc' not found in ed_df. Ensure ED triage merge ran.")

mask_cc = ed_df["ed_triage_cc"].notna() & (ed_df["ed_triage_cc"].astype(str).str.strip() != "")
ed_cc_only = ed_df.loc[mask_cc].copy()

if WRITE_ARCHIVE_XLSX_EXPORTS:
    cc_path = prior_runs_dir / f"mimic_hypercap_EXT_EDcc_only_edstay_bq_{timestamp}.xlsx"
    with pd.ExcelWriter(cc_path, engine="openpyxl") as xw:
        ed_cc_only.to_excel(xw, sheet_name="ed_cc_only", index=False)
    print("Saved:", cc_path)
else:
    print("Skipping all-encounters/ED-CC archive exports (set WRITE_ARCHIVE_XLSX_EXPORTS=1 to enable).")


In [None]:
# Purpose: Capture ED presentation context (chief complaint, acuity, and early vitals) at the ED-stay level.

# Final CC output (cohort-only; excludes NLP-derived columns)
from datetime import datetime

# Ensure cc-only frames exist (build from df if upstream optional export cells were skipped)
if "df_cc" not in globals():
    if "df" not in globals():
        raise NameError("df not found. Run cohort assembly cells first.")
    if "ed_triage_cc" not in df.columns:
        raise KeyError("ed_triage_cc missing in df; cannot build df_cc.")
    _mask_df_cc = df["ed_triage_cc"].notna() & (df["ed_triage_cc"].astype(str).str.strip() != "")
    df_cc = df.loc[_mask_df_cc].copy()

if "ed_cc_only" not in globals():
    if "ed_df" not in globals():
        raise NameError("ed_df not found. Run the ED-stay build cells first.")
    if "ed_triage_cc" not in ed_df.columns:
        raise KeyError("ed_triage_cc missing in ed_df; cannot build ed_cc_only.")
    _mask_cc = ed_df["ed_triage_cc"].notna() & (ed_df["ed_triage_cc"].astype(str).str.strip() != "")
    ed_cc_only = ed_df.loc[_mask_cc].copy()

# Add ED-stay columns (dedup to earliest ED stay per hadm_id)
ed_tmp = ed_cc_only.copy()
if "ed_intime" in ed_tmp.columns:
    ed_tmp = ed_tmp.sort_values(["hadm_id", "ed_intime", "ed_stay_id"], na_position="last")
else:
    ed_tmp = ed_tmp.sort_values(["hadm_id", "ed_stay_id"], na_position="last")
ed_first = ed_tmp.drop_duplicates("hadm_id", keep="first")

add_ed_cols = [c for c in ed_first.columns if c not in df_cc.columns]

final_cc = df_cc.merge(ed_first[["hadm_id"] + add_ed_cols], on="hadm_id", how="left")

prior_runs_dir = DATA_DIR / "prior runs"
prior_runs_dir.mkdir(parents=True, exist_ok=True)
out_date = datetime.now().strftime("%Y-%m-%d")

dated_out_path = prior_runs_dir / f"{out_date} MIMICIV all with CC.xlsx"
canonical_out_path = DATA_DIR / "MIMICIV all with CC.xlsx"

# Write via temporary files then atomically replace targets to avoid zero-byte outputs on interruption.
def _atomic_write_excel(df_obj, target_path: Path) -> None:
    tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
    df_obj.to_excel(tmp_path, index=False)
    os.replace(tmp_path, target_path)

_atomic_write_excel(final_cc, dated_out_path)
_atomic_write_excel(final_cc, canonical_out_path)

print("Saved:", dated_out_path)
print("Saved:", canonical_out_path)
print("Base rows:", len(df_cc), "| Added from ED-stay:", len(add_ed_cols))
print("Total columns:", len(final_cc.columns))


In [None]:
# Purpose: Extract and standardize first ABG/VBG physiology fields for baseline characterization.

# Data dictionary (OSF-style) for final cohort output
from datetime import datetime
import numpy as np

# Choose target dataset for dictionary
if "final_cc" in globals():
    dd_target = final_cc
elif "df_cc" in globals():
    dd_target = df_cc
elif "df" in globals():
    dd_target = df
else:
    raise NameError("No cohort dataframe found for data dictionary (expected final_cc/df_cc/df).")

# Helper functions

def _readable(name: str) -> str:
    return name.replace("_", " ").strip().title()


def _infer_units(col: str) -> str:
    if col.endswith("_hours"):
        return "hours"
    if col.endswith("_time") or col.endswith("_intime") or col.endswith("_outtime") or col.endswith("_date"):
        return "datetime"
    if col.startswith("flag_") or col.endswith("_flag") or col.endswith("_before_imv"):
        return "binary (0/1)"
    if col.endswith("_paco2") or col.endswith("_pco2"):
        return "mmHg"
    if col.endswith("_ph"):
        return "pH"
    if col.endswith("_lactate"):
        return "mmol/L"
    return ""


def _allowed_values(series):
    s = series.dropna()
    if s.empty:
        return ""
    if s.dtype.kind in "biu":
        vals = sorted(map(int, s.unique()))
        if len(vals) <= 15:
            return ", ".join(map(str, vals))
        return f"{min(vals)}–{max(vals)}"
    if s.dtype.kind in "f":
        return f"{np.nanmin(s):.3g}–{np.nanmax(s):.3g}"
    # object/categorical
    vals = sorted(map(str, s.unique()))
    return ", ".join(vals[:15]) + (" …" if len(vals) > 15 else "")


META = {
    # IDs
    "subject_id": {
        "label": "Subject identifier",
        "definition": "Identifier for a unique patient in MIMIC-IV.",
        "source": "mimiciv_hosp.patients",
        "derivation": "as recorded",
    },
    "hadm_id": {
        "label": "Hospital admission identifier",
        "definition": "Identifier for a hospital admission.",
        "source": "mimiciv_hosp.admissions",
        "derivation": "as recorded",
    },
    "ed_stay_id": {
        "label": "ED stay identifier",
        "definition": "Identifier for an ED stay.",
        "source": "mimiciv_ed.edstays",
        "derivation": "as recorded",
    },
    "ed_intime": {
        "label": "ED arrival time",
        "definition": "ED arrival time for this ED stay.",
        "source": "mimiciv_ed.edstays",
        "derivation": "as recorded",
        "units": "datetime",
    },
    "ed_outtime": {
        "label": "ED departure time",
        "definition": "ED departure time for this ED stay.",
        "source": "mimiciv_ed.edstays",
        "derivation": "as recorded",
        "units": "datetime",
    },
    "ed_intime_first": {
        "label": "First ED arrival time (admission)",
        "definition": "Earliest ED arrival time among ED stays linked to the admission.",
        "source": "mimiciv_ed.edstays",
        "derivation": "min(edstays.intime) per hadm_id",
        "units": "datetime",
    },
    # Demographics / triage
    "ed_gender": {
        "label": "ED gender",
        "definition": "Gender as recorded in ED stay table.",
        "source": "mimiciv_ed.edstays",
        "derivation": "as recorded",
    },
    "ed_race": {
        "label": "ED race",
        "definition": "Race as recorded in ED stay table.",
        "source": "mimiciv_ed.edstays",
        "derivation": "as recorded",
    },
    "hosp_race": {
        "label": "Hospital race",
        "definition": "Race as recorded in admissions table.",
        "source": "mimiciv_hosp.admissions",
        "derivation": "as recorded",
    },
    "ed_triage_cc": {
        "label": "ED chief complaint",
        "definition": "Chief complaint recorded at ED triage.",
        "source": "mimiciv_ed.triage",
        "derivation": "as recorded",
    },
    "ed_triage_acuity": {
        "label": "ED triage acuity",
        "definition": "Triage acuity level recorded at ED presentation.",
        "source": "mimiciv_ed.triage",
        "derivation": "as recorded",
    },
    # Outcomes
    "hospital_expire_flag": {
        "label": "Hospital expire flag",
        "definition": "Hospital mortality indicator from admissions table.",
        "source": "mimiciv_hosp.admissions",
        "derivation": "as recorded",
        "units": "binary (0/1)",
    },
    "in_hospital_death": {
        "label": "In-hospital death",
        "definition": "Indicator for in-hospital death during admission.",
        "source": "mimiciv_hosp.admissions",
        "derivation": "(hospital_expire_flag == 1) OR (deathtime not null)",
        "units": "binary (0/1)",
    },
    # Ventilation
    "imv_flag": {
        "label": "IMV flag",
        "definition": "Indicator of invasive mechanical ventilation during admission.",
        "source": "ICD procedures + ICU chartevents",
        "derivation": "max(IMV ICD flag, IMV chart flag)",
        "units": "binary (0/1)",
    },
    "niv_flag": {
        "label": "NIV flag",
        "definition": "Indicator of noninvasive ventilation during admission.",
        "source": "ICD procedures + ICU chartevents",
        "derivation": "max(NIV ICD flag, NIV chart flag)",
        "units": "binary (0/1)",
    },
    "first_imv_time": {
        "label": "First IMV time",
        "definition": "Earliest charted time of IMV in ICU chartevents (if present).",
        "source": "mimiciv_icu.chartevents",
        "derivation": "min charttime among IMV charted events",
        "units": "datetime",
    },
    "first_niv_time": {
        "label": "First NIV time",
        "definition": "Earliest charted time of NIV in ICU chartevents (if present).",
        "source": "mimiciv_icu.chartevents",
        "derivation": "min charttime among NIV charted events",
        "units": "datetime",
    },
    "dt_first_imv_hours": {
        "label": "Hours to first IMV",
        "definition": "Hours from first ED arrival to first IMV time.",
        "source": "Derived",
        "derivation": "(first_imv_time - ed_intime_first) in hours",
        "units": "hours",
    },
    "dt_first_niv_hours": {
        "label": "Hours to first NIV",
        "definition": "Hours from first ED arrival to first NIV time.",
        "source": "Derived",
        "derivation": "(first_niv_time - ed_intime_first) in hours",
        "units": "hours",
    },
    "abg_before_imv": {
        "label": "ABG before IMV",
        "definition": "Indicator that first ABG time precedes first IMV time.",
        "source": "Derived",
        "derivation": "first_abg_time < first_imv_time",
        "units": "binary (0/1)",
    },
    "vbg_before_imv": {
        "label": "VBG before IMV",
        "definition": "Indicator that first VBG time precedes first IMV time.",
        "source": "Derived",
        "derivation": "first_vbg_time < first_imv_time",
        "units": "binary (0/1)",
    },
    # Gas/ABG/VBG
    "abg_hypercap_threshold": {
        "label": "ABG hypercapnia threshold met",
        "definition": "Indicator that any arterial pCO2 ≥ 45 mmHg in hosp/ICU pCO2 extraction.",
        "source": "mimiciv_hosp.labevents + mimiciv_icu.chartevents",
        "derivation": "max(arterial pCO2 ≥ 45) per hadm_id",
        "units": "binary (0/1)",
    },
    "vbg_hypercap_threshold": {
        "label": "VBG hypercapnia threshold met",
        "definition": "Indicator that any venous pCO2 ≥ 50 mmHg in hosp/ICU pCO2 extraction.",
        "source": "mimiciv_hosp.labevents + mimiciv_icu.chartevents",
        "derivation": "max(venous pCO2 ≥ 50) per hadm_id",
        "units": "binary (0/1)",
    },
    "other_hypercap_threshold": {
        "label": "Other-source hypercapnia threshold met",
        "definition": "Indicator that any other/unspecified-source pCO2 ≥ 50 mmHg in hosp/ICU pCO2 extraction.",
        "source": "mimiciv_hosp.labevents + mimiciv_icu.chartevents",
        "derivation": "max(other-source pCO2 ≥ 50) per hadm_id",
        "units": "binary (0/1)",
    },
    "first_gas_time": {
        "label": "First gas panel time",
        "definition": "Earliest gas panel time in the ED 0–24h window.",
        "source": "mimiciv_hosp.labevents (gas panels)",
        "derivation": "min panel_time within ED 0–24h window",
        "units": "datetime",
    },
    "dt_first_qualifying_gas_hours": {
        "label": "Hours to first gas panel",
        "definition": "Hours from first ED arrival to first gas panel time.",
        "source": "Derived",
        "derivation": "(first_gas_time - ed_intime_first) in hours",
        "units": "hours",
    },
    "first_other_time": {
        "label": "First other-source gas time",
        "definition": "Earliest time of a pCO2 measurement with other/unspecified source classification.",
        "source": "mimiciv_hosp.labevents + mimiciv_icu.chartevents",
        "derivation": "min(other-source pCO2 charttime) across LAB and POC",
        "units": "datetime",
    },
    "presenting_hypercapnia": {
        "label": "Presenting (≤6h) gas timing",
        "definition": "Indicator that first gas panel occurred within 6h of first ED arrival.",
        "source": "Derived",
        "derivation": "dt_first_qualifying_gas_hours ≤ 6",
        "units": "binary (0/1)",
    },
    "late_hypercapnia": {
        "label": "Late (>6h) gas timing",
        "definition": "Indicator that first gas panel occurred >6h after first ED arrival.",
        "source": "Derived",
        "derivation": "dt_first_qualifying_gas_hours > 6",
        "units": "binary (0/1)",
    },
    "gas_source_other_rate": {
        "label": "Gas source other/unknown rate",
        "definition": "Proportion of gas panels classified as other/unspecified source per ED stay.",
        "source": "Derived from gas panel source inference",
        "derivation": "mean(source == 'other') per ed_stay_id",
        "units": "proportion",
    },
    # Comorbidities
    "flag_copd": {
        "label": "COPD/emphysema flag",
        "definition": "Indicator for COPD/emphysema ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefixes J43/J44 (ED OR hospital)",
        "units": "binary (0/1)",
    },
    "flag_osa_ohs": {
        "label": "OSA/OHS flag",
        "definition": "Indicator for OSA/OHS ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefixes G473/E662 (ED OR hospital)",
        "units": "binary (0/1)",
    },
    "flag_chf": {
        "label": "CHF flag",
        "definition": "Indicator for CHF ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefix I50 (ED OR hospital)",
        "units": "binary (0/1)",
    },
    "flag_neuromuscular": {
        "label": "Neuromuscular flag",
        "definition": "Indicator for neuromuscular ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefixes G12/G70/G71 (ED OR hospital)",
        "units": "binary (0/1)",
    },
    "flag_opioid_substance": {
        "label": "Opioid/substance flag",
        "definition": "Indicator for opioid/substance ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefixes F11/T40/F13 (ED OR hospital)",
        "units": "binary (0/1)",
    },
    "flag_pneumonia": {
        "label": "Pneumonia flag",
        "definition": "Indicator for pneumonia ICD codes in ED or hospital diagnoses.",
        "source": "mimiciv_hosp.diagnoses_icd + mimiciv_ed.diagnosis",
        "derivation": "ICD prefixes J12–J18 (ED OR hospital)",
        "units": "binary (0/1)",
    },
}

rows = []
for col in dd_target.columns:
    info = META.get(col, {})
    series = dd_target[col]
    rows.append({
        "variable": col,
        "label": info.get("label", _readable(col)),
        "units": info.get("units", _infer_units(col)),
        "allowed_values": info.get("allowed_values", _allowed_values(series)),
        "definition": info.get("definition", "UNCONFIRMED"),
        "synonyms": info.get("synonyms", ""),
        "description": info.get("description", ""),
        "source": info.get("source", "UNCONFIRMED"),
        "derivation": info.get("derivation", "UNCONFIRMED"),
        "dtype": str(series.dtype),
        "example_value": series.dropna().iloc[0] if series.dropna().shape[0] else None,
    })

data_dict = pd.DataFrame(rows)

out_date = datetime.now().strftime("%Y-%m-%d")
prior_runs_dir = DATA_DIR / "prior runs"
prior_runs_dir.mkdir(parents=True, exist_ok=True)
out_xlsx = prior_runs_dir / f"{out_date} MIMICIV all with CC_data_dictionary.xlsx"
out_csv = prior_runs_dir / f"{out_date} MIMICIV all with CC_data_dictionary.csv"

data_dict.to_excel(out_xlsx, index=False)
data_dict.to_csv(out_csv, index=False)

print("Saved data dictionary:", out_xlsx)
print("Saved data dictionary:", out_csv)
print("Rows:", len(data_dict))

