# Metadata Identification and Cleanings

#### Step 1: Build a metadata file catalog
1. What files exist per study in the /raw folder? Which are metadata files?
2. For the likely metadata files, what's inside? Sheet names, column headers, row/col counts



In [1]:
from pathlib import Path
import pandas as pd
import re
from datetime import datetime


### Step 1. Identify Files and Which are Metadata 

In [2]:
# set the raw data directory to the "raw" folder

RAW_DIR = Path("raw")

In [3]:
# enumerate all Excel files under /raw (recursive)

excel_files = sorted(RAW_DIR.rglob("*.xls*"))


In [4]:
# convert the paths into a structured table 

file_index = pd.DataFrame([
    {
        "study_id": p.relative_to(RAW_DIR).parts[0],
        "rel_path": str(p.relative_to(RAW_DIR)),
        "filename": p.name,
        "ext": p.suffix.lower(),
    }
    for p in excel_files
])

In [5]:
# early sanity check

print(f"Found {len(file_index)} Excel files across {file_index['study_id'].nunique()} studies")
file_index.head()


Found 7 Excel files across 4 studies


Unnamed: 0,study_id,rel_path,filename,ext
0,Brooklyn,Brooklyn/251028 Ritual_REPORT.xlsx,251028 Ritual_REPORT.xlsx,.xlsx
1,Brooklyn,Brooklyn/Brooklyn College Metadata.xlsx,Brooklyn College Metadata.xlsx,.xlsx
2,DHM Pooled,DHM Pooled/251028 DHM_Pooling_Monica_REPORT.xlsx,251028 DHM_Pooling_Monica_REPORT.xlsx,.xlsx
3,DHM Pooled,DHM Pooled/Copy of Copy of Pool Optimization S...,Copy of Copy of Pool Optimization Study- UCH+A...,.xlsx
4,NeoBANK,NeoBANK/NeoBANK AC REPORT.xlsx,NeoBANK AC REPORT.xlsx,.xlsx


In [6]:
# define filename keyword rules 

# 'postiive' signals
META_KWS = [
    "meta", "metadata", "character", "clinical", "demograph", "demo",
    "participant", "subject", "phenotype", "intake", "enrollment",
    "questionnaire", "survey", "crf", "case"
]

# 'negative' signals
HMO_KWS = [
    "hmo", "oligo", "oligosacchaeride", "hmos"
]

In [7]:
# helper function to find keyword hits within the filenames 
# converts text to lowercase, returns a list of keywords that appear as substrings 

def keyword_hits(text: str, keywords: list[str]) -> list[str]:
    text = text.lower()
    return [kw for kw in keywords if kw in text]


In [8]:
# build your file index and classify "metadata-likey by name"
rows = []

for p in excel_files:
    rel = p.relative_to(RAW_DIR)                  # e.g. "Oxford/metadata.xlsx"
    study_id = rel.parts[0]                       # first folder name = StudyID
    filename = p.name.lower()

    meta_hits = keyword_hits(filename, META_KWS)  # positive hits
    hmo_hits = keyword_hits(filename, HMO_KWS)    # negative hits

    # "metadata-like" if it has meta hits AND does NOT have hmo hits
    filename_metadata_likely = (len(meta_hits) > 0) and (len(hmo_hits) == 0)

    rows.append({
        "study_id": study_id,
        "rel_path": str(rel),
        "filename": p.name,
        "meta_hits": ",".join(meta_hits),
        "hmo_hits": ",".join(hmo_hits),
        "filename_metadata_likely": filename_metadata_likely
    })

file_index = pd.DataFrame(rows)


In [9]:
# inspect what got flagged 

print("Total Excel files found:", len(file_index))
print("Files flagged as metadata-like by filename:", file_index["filename_metadata_likely"].sum())

file_index[file_index["filename_metadata_likely"]].head(10)


Total Excel files found: 7
Files flagged as metadata-like by filename: 2


Unnamed: 0,study_id,rel_path,filename,meta_hits,hmo_hits,filename_metadata_likely
1,Brooklyn,Brooklyn/Brooklyn College Metadata.xlsx,Brooklyn College Metadata.xlsx,"meta,metadata",,True
6,Oxford,Oxford/metadata_overview_oxford.xlsx,metadata_overview_oxford.xlsx,"meta,metadata",,True


In [10]:
CATALOG_DIR = Path("catalog")
CATALOG_DIR.mkdir(exist_ok=True)

file_index.to_csv(CATALOG_DIR / "metadata_detection_file.csv", index=False)


#### 2. Clean and Standardize Identified Metadata Files

In [11]:
# set up directories

RAW_DIR = Path("raw")
STAGING_DIR = Path("staging")
STAGING_DIR.mkdir(exist_ok=True)

CATALOG_DIR = Path("catalog")

In [12]:
det = pd.read_csv(CATALOG_DIR / "metadata_detection_file.csv")
meta_files = det.loc[det["filename_metadata_likely"] == True, "rel_path"].tolist()

In [13]:
# define a column-name normalizer 

def normalize_col(c: str) -> str:
    c = str(c).strip().lower()
    c = re.sub(r"\s+", "_", c)          # spaces -> underscores
    c = re.sub(r"[^a-z0-9_]+", "", c)   # remove weird symbols
    c = re.sub(r"_+", "_", c)           # collapse repeated underscores
    return c.strip("_")


In [14]:
# stage one file (read + clean structure + save)
# loads the workbook, picks a sheet, reads the data, removes truly empty rows/cols, normalizes headers, writes a staged CSV under staging/metadata, returns a log row

def stage_metadata_file(rel_path: str) -> dict:
    src = RAW_DIR / rel_path
    study_id = Path(rel_path).parts[0]

    xl = pd.ExcelFile(src)                 # optionally: pd.ExcelFile(src, engine="openpyxl")
    sheet = xl.sheet_names[0]              # ok for prototype

    # Read as strings to preserve IDs exactly
    df = xl.parse(sheet_name=sheet, dtype=str)

    # Drop fully empty rows/cols
    df = df.dropna(axis=0, how="all").dropna(axis=1, how="all")

    # Normalize column names
    df.columns = [normalize_col(c) for c in df.columns]

    # Strip whitespace in all string cells (prevents join failures)
    for c in df.columns:
        df[c] = df[c].astype(str).str.strip()

    out_dir = STAGING_DIR / study_id
    out_dir.mkdir(parents=True, exist_ok=True)

    out_path = out_dir / f"metadata__{Path(rel_path).stem}__{normalize_col(sheet)}.csv"
    df.to_csv(out_path, index=False)

    return {
        "study_id": study_id,
        "raw_rel_path": rel_path,
        "sheet_used": sheet,
        "n_sheets": len(xl.sheet_names),
        "sheet_names": "|".join(xl.sheet_names),
        "rows": df.shape[0],
        "cols": df.shape[1],
        "staged_csv_rel_path": str(out_path),
        "status": "success",
        "error": ""
    }


In [15]:
# loops through all metadata files, stages each one, captures failures without killing the run, wrotes catalog/metadata_staging_log.csv.

log_rows = []

for rel_path in meta_files:
    try:
        log_rows.append(stage_metadata_file(rel_path))
    except Exception as e:
        log_rows.append({
            "study_id": Path(rel_path).parts[0],
            "raw_rel_path": rel_path,
            "sheet_used": "",
            "rows": None,
            "cols": None,
            "staged_csv_rel_path": "",
            "status": "failed",
            "error": str(e)
        })

stage_log = pd.DataFrame(log_rows)
stage_log.to_csv(CATALOG_DIR / "metadata_staging_log.csv", index=False)

print("Staged metadata files:", (stage_log["status"] == "success").sum())
print("Failed:", (stage_log["status"] == "failed").sum())


Staged metadata files: 2
Failed: 0


#### Step 3. Identify Canidate IDs and Priority Columns


In [16]:
# define keyword sets 

ID_SAMPLE_KWS = [
    "sample_id", "sampleid", "sample_name", "samplename",
    "specimen", "aliquot", "barcode", "tube", "vial", "label"
]

ID_SUBJECT_KWS = [
    "subject", "participant", "patient"
]

PRIORITY_META_KWS = {
    "study_week": ["study_week", "studyweek", "visit_week", "timepoint", "wk"],
    "maternal_age": ["maternal_age", "mat_age", "mother_age", "mom_age", "age_mom"],
    "gestational_age_weeks": ["gestational", "ga", "gest_age", "gestation"],
    "lactation_week_postpartum": ["postpartum", "pp", "lactation", "weeks_postpartum", "week_postpartum"]
}

In [17]:
# identify canidate columns in one staged metadata file 


# defines a function that takes a df and returns a dictionary of results
def identify_candidate_columns(df: pd.DataFrame) -> dict:
    
    # grabs the df column names and turns into a list
    cols = df.columns.tolist()

    # loops through eevery column name and keeps if any sample-keyword fro ID_SAMPLE_KWS is found as a substring
    sample_id_candidates = [
        c for c in cols if any(k in c for k in ID_SAMPLE_KWS)
    ]

    subject_id_candidates = [
        c for c in cols if any(k in c for k in ID_SUBJECT_KWS)
    ]

    # builds a dictionary where each canonical priority field (e.g maternal age) maps to the list of coluns whose name contain any of the field's keywords 
    priority_hits = {
        canon: [c for c in cols if any(k in c for k in kws)]
        for canon, kws in PRIORITY_META_KWS.items()
    }

    # bundles three outputs into one dictionary and returns it
    return {
        "sample_id_candidates": sample_id_candidates,
        "subject_id_candidates": subject_id_candidates,
        "priority_metadata_hits": priority_hits
    }


In [18]:
# creates an empty list that will store one dictionary per staged metadata file
candidate_logs = []

# loops through each row of stage_log (see code above, this is the log of all staged metadata files)
# r is a series representing one staged file's log entry
# _ is the row index (not used)

for _, r in stage_log.iterrows():
    if r["status"] != "success":           #skips any file that failed staging
        continue

    # converts the staged CSV path string into a Path object so it's easier to work with
    staged_path = Path(r["staged_csv_rel_path"])
    # reads the staged CSV into a dataframe, all columns as strings (preserves IDs exactly)
    df = pd.read_csv(staged_path, dtype=str)

    # run your fxn above to detect columns and store results in a dictionary canidates 
    candidates = identify_candidate_columns(df)

    # dictionary that will become one row in output log -> study id and which CSV it came from
    row = {
        "study_id": r["study_id"],
        "staged_csv": r["staged_csv_rel_path"],
        "sample_id_candidates": ",".join(candidates["sample_id_candidates"]) or "NA",     # converts a list into a comma-separated string, or "NA" if empty
        "subject_id_candidates": ",".join(candidates["subject_id_candidates"]) or "NA",
    }

    # one column per priority metadata field, e.g "maternal_age",hits=['mat_age', 'mother_age']
    for canon, hits in candidates["priority_metadata_hits"].items():
        row[canon] = ",".join(hits) if hits else "NA"

    # adds the completed dictionary to the canidate_logs list 
    candidate_logs.append(row)

# saves the df to catalog/metadata_candidate_columns_log.csv
candidate_log_df = pd.DataFrame(candidate_logs)
candidate_log_df.to_csv(
    CATALOG_DIR / "metadata_candidate_columns_log.csv",
    index=False
)


In [19]:
pd.read_csv("catalog/metadata_candidate_columns_log.csv").head()


Unnamed: 0,study_id,staged_csv,sample_id_candidates,subject_id_candidates,study_week,maternal_age,gestational_age_weeks,lactation_week_postpartum
0,Brooklyn,staging/Brooklyn/metadata__Brooklyn College Me...,sample_name,subject_id,study_week,maternal_age,gestational_age_week,lactation_stage_week_postpartum
1,Oxford,staging/Oxford/metadata__metadata_overview_oxf...,,participant_id,,mat_age,"sga,gest_age_birth,gest_age_baseline,gestation...",


#### Helper Functions: 1) If multiple hits for the columns, pick...

In [20]:
GOOD_TOKENS = {
    "gestational_age_weeks": ["birth", "delivery"],
    "lactation_week_postpartum": ["postpartum", "pp"],
}

BAD_TOKENS = {
    "gestational_age_weeks": ["baseline", "followup", "outcome", "score"],
}

def pick_best_hit(canon, hits):
    # returns best hit, alternates, best_score
    # if empty -> None, []

    if not hits:
        return None, []

    def score(col):
        s = 0
        col = col.lower()
        for t in GOOD_TOKENS.get(canon, []):
            if t in col: s += 3
        for t in BAD_TOKENS.get(canon, []):
            if t in col: s -= 3
        return s

    ranked = sorted(hits, key=score, reverse=True)
    best = ranked[0]
    best_score = score(best)
    alts = ranked[1:]
    return best, alts, best_score


In [21]:
all_resolution_logs = []
all_core_outputs = []

for _, r in stage_log.iterrows():
    if r["status"] != "success":
        continue

    study_id = r["study_id"]
    staged_csv = Path(r["staged_csv_rel_path"])

    # load staged metadata
    df = pd.read_csv(staged_csv, dtype=str)
    candidates = identify_candidate_columns(df)

    core_row = {}
    resolution_log = {}

    # ---- YOUR EXISTING LOGIC (unchanged) ----
    for canon, hits in candidates["priority_metadata_hits"].items():
        if len(hits) == 1:
            core_row[canon] = df[hits[0]]
            resolution_log[canon] = {
                "selected": hits[0],
                "reason": "single_hit"
            }

        elif len(hits) > 1:
            best, alts, best_score = pick_best_hit(canon, hits)
            core_row[canon] = df[best] if best else pd.NA
            resolution_log[canon] = {
                "selected": best,
                "alternates": alts,
                "reason": "token_scoring"
            }

        else:
            core_row[canon] = pd.NA
            resolution_log[canon] = {
                "selected": None,
                "reason": "not_found"
            }

# ---------- SECTION (ID COLUMNS) ----------
    sub_hits = candidates.get("subject_id_candidates", [])
    samp_hits = candidates.get("sample_id_candidates", [])

    subject_col = sub_hits[0] if len(sub_hits) > 0 else None
    sample_col  = samp_hits[0] if len(samp_hits) > 0 else None

    core_row["subject_id"] = df[subject_col] if subject_col else pd.NA
    core_row["hmo_sample_name"] = df[sample_col] if sample_col else pd.NA

    resolution_log["subject_id"] = {
        "selected": subject_col,
        "alternates": sub_hits[1:] if len(sub_hits) > 1 else [],
        "reason": "id_candidate_first"
    }

    resolution_log["hmo_sample_name"] = {
        "selected": sample_col,
        "alternates": samp_hits[1:] if len(samp_hits) > 1 else [],
        "reason": "id_candidate_first"
    }


    # ----------------------------------------

    # build core dataframe
    core_df = pd.DataFrame(core_row)

    # save per-study core metadata
    out_path = Path("staging") / study_id / f"metadata__core_cleaned_{study_id}.csv"
    out_path.parent.mkdir(parents=True, exist_ok=True)
    core_df.to_csv(out_path, index=False)

    # store logs
    for field, info in resolution_log.items():
        all_resolution_logs.append({
            "study_id": study_id,
            "staged_csv": str(staged_csv),
            "canonical_field": field,
            "selected_column": info.get("selected"),
            "alternates": ",".join(info.get("alternates", [])),
            "reason": info.get("reason")
        })

    all_core_outputs.append({
        "study_id": study_id,
        "core_output": str(out_path)
    })

# write catalog logs
pd.DataFrame(all_resolution_logs).to_csv(
    "catalog/metadata_core_resolution_log.csv", index=False
)
pd.DataFrame(all_core_outputs).to_csv(
    "catalog/metadata_core_outputs.csv", index=False
)

print("Done. Core metadata built for:", len(all_core_outputs), "studies")

Done. Core metadata built for: 2 studies


In [22]:
STAGING_DIR = Path("staging")

for study_id in stage_log.loc[stage_log["status"]=="success", "study_id"].unique():

    hmo_path = STAGING_DIR / study_id / f"hmo_staged_{study_id}.csv"
    meta_path = STAGING_DIR / study_id / f"metadata__core_cleaned_{study_id}.csv"

    if not hmo_path.exists() or not meta_path.exists():
        continue

    hmo = pd.read_csv(hmo_path, dtype=str)
    meta = pd.read_csv(meta_path, dtype=str)

    # decide merge key
    if "hmo_sample_name" in meta.columns and meta["hmo_sample_name"].notna().any():
        merged = hmo.merge(
            meta,
            left_on="sample_name",          # HMO column
            right_on="hmo_sample_name",     # metadata column
            how="left"
        )
        merge_key = "sample_name"

    else:
        merged = hmo.merge(
            meta,
            on="subject_id",
            how="left"
        )
        merge_key = "subject_id"

    out_path = STAGING_DIR / study_id / f"hmo_plus_metadata_{study_id}.csv"
    merged.to_csv(out_path, index=False)

    print(f"{study_id}: merged on {merge_key}")


In [None]:
# 

### Merge the Cleaned Metadata and hmo_mereged

In [23]:
# Discover fles by searching metadata__core_cleaned_*.csv files 
# infer StudyID from the filename (metadata__core_cleaned_{study_id}.csv)
# load each file into a dictionary
# for each study, auto-pick the join column -> try hmo_sammple_name vs subject_id by coverage, match rate against HMO within that StudyID, uniqueness 
# build sample_key = StudyID + "_" + <chosen_id>


def discover_core_cleaned_metadata(root: str | Path) -> dict[str, Path]:
    """
    Finds metadata__core_cleaned_*.csv recursively and returns {StudyID: filepath}.
    Guardrails:
      - errors if multiple files map to same StudyID
      - errors if StudyID cannot be parsed
    """
    root = Path(root)
    paths = list(root.rglob("metadata__core_cleaned_*.csv"))

    if not paths:
        raise FileNotFoundError(f"No files found matching metadata__core_cleaned_*.csv under: {root}")

    by_study: dict[str, Path] = {}
    collisions: dict[str, list[Path]] = {}

    pat = re.compile(r"metadata__core_cleaned_(.+)\.csv$", re.IGNORECASE)

    for p in paths:
        m = pat.search(p.name)
        if not m:
            continue
        study_id = m.group(1).strip()

        # Guard: empty or weird study_id
        if not study_id:
            raise ValueError(f"Could not parse StudyID from filename: {p.name}")

        if study_id in by_study:
            collisions.setdefault(study_id, [by_study[study_id]]).append(p)
        else:
            by_study[study_id] = p

    if collisions:
        msg = "\n".join([f"{sid}: " + ", ".join(str(x) for x in ps) for sid, ps in collisions.items()])
        raise ValueError(f"Multiple metadata files found for same StudyID. Resolve duplicates:\n{msg}")

    return by_study


# Example use:
meta_files = discover_core_cleaned_metadata(root=".")
meta_by_study = {sid: pd.read_csv(path, dtype=str) for sid, path in meta_files.items()}
print("Discovered studies:", sorted(meta_by_study.keys()))

Discovered studies: ['Brooklyn', 'Oxford']


### Merge Cleaned Metadata and hmo_merged
- Load hmo_merged dataset: each row represents one milk sample identified by StudyID, SampleName

- Discover cleaned metadata files automatically: searches for the project dictonary for the files nameed metadata__core_cleaned_<StudyID>.csv
- Evaluate possible join identifiers per study by scoring using coverage, match rate, and uniqueness 
- Create a unified join key: sample_key = StudyID + "__" + SampleName
- Merge metadata onto the HMO dataset Study Id and SampleName -> left join, all HMO samples are preserved, metadata columns added where available, samples without metadata remain present within empty fields 
- Sanity check: row count is unchanged, metadata attachment rate per study, all merge decisions are written to a log for transparency and debugging

In [None]:
# ---------- 1) Load HMO merged ----------

PROJECT_ROOT = Path.cwd()

print("Project root:", PROJECT_ROOT)

hmo_path = PROJECT_ROOT / "staging" / "_merged" / "hmo_merged.csv"
if not hmo_path.exists():
    raise FileNotFoundError(f"HMO merged file not found at: {hmo_path}")

hmo = pd.read_csv(hmo_path, dtype=str)

Project root: /Users/kspann/Desktop/HMO Power Bi


In [None]:
# Basic hygiene
# standardize join columns and strip whitespace to avoid space mismatches
hmo["StudyID"] = hmo["StudyID"].astype(str).str.strip()
hmo["SampleName"] = hmo["SampleName"].astype(str).str.strip()

# We'll use these per study for matching
# builds a dictionary like {Brooklyn": {1_wk0, 1_wk10,....}} to score metadata cnanidate columns within each study
hmo_names_by_study = {
    sid: set(hmo.loc[hmo["StudyID"] == sid, "SampleName"].dropna().astype(str).str.strip().unique())
    for sid in hmo["StudyID"].dropna().unique()
}

# ---------- 2) Helper: score candidate id columns ----------
def score_id_column(meta_df: pd.DataFrame, hmo_sample_set: set[str], col: str) -> dict:
    
    # if column doesn't exist in metadata file, return early error message
    if col not in meta_df.columns:
        return {"col": col, "usable": False}

    # pull the column out as a pandas series - s contains all values in that column (including missing values)
    s = meta_df[col]
    # "what fraction of rows are NOT missing in this column?"
    coverage = s.notna().mean()

    # remove missing values, strip whitespace, removes leading/trailing spaces; return clean series
    s_clean = s.dropna().astype(str).str.strip()
    if s_clean.empty:
        return {"col": col, "usable": False, "coverage": float(coverage)}

    # gets unique values and turns it into a set to make checks faster
    meta_unique = set(s_clean.unique())

    # counts how many unique metadata IDs are present in the HMO sample IDs -> get the proportion
    n_match = sum(v in hmo_sample_set for v in meta_unique)
    match_rate = n_match / max(len(meta_unique), 1)

    # measures how unique that column is relative to the number of rows, if close to 1 -> very unique (sample-level), if closer to 0 -> less unique (participant level)
    uniqueness_ratio = s_clean.nunique() / max(len(s_clean), 1)

    # Score: prioritize match_rate heavily; coverage helps as tie-breaker
    score = 0.85 * match_rate + 0.15 * coverage

    return {
        "col": col,
        "usable": True,
        "coverage": float(coverage),
        "meta_unique": int(len(meta_unique)),
        "hmo_unique": int(len(hmo_sample_set)),
        "n_match": int(n_match),
        "match_rate": float(match_rate),
        "uniqueness_ratio": float(uniqueness_ratio),
        "score": float(score),
    }

# DECISION LEVEL 
    # tests multiple possible ID columns (hmo_sample_name vs subject_id) and chooses the best using scoring_id^
def choose_best_id_column(meta_df: pd.DataFrame, hmo_sample_set: set[str], candidates: list[str]) -> dict:
    scored = [score_id_column(meta_df, hmo_sample_set, c) for c in candidates]
    scored = [d for d in scored if d.get("usable")]

    if not scored:
        return {"chosen": None, "scored": []}

    scored_sorted = sorted(scored, key=lambda d: d["score"], reverse=True)
    return {"chosen": scored_sorted[0]["col"], "scored": scored_sorted}





# ---------- 3) Load metadata (from your discovery step) ----------
# starting with the project root, find all cleaned metadata files, label them by StudyID, load each into a DF, store them in a dict keyed by study
# You should already have meta_files = {StudyID: Path(...)} from Option B discovery
meta_files = discover_core_cleaned_metadata(root=".")  # root . means the current working dict

meta_by_study = {sid: pd.read_csv(path, dtype=str) for sid, path in meta_files.items()}





# ---------- 4) Pick join column per study + build metadata_master ----------
# list of metadata columns we're willing to try as the join identifier 
ID_CANDIDATES = ["hmo_sample_name", "subject_id", "SampleName", "sample_name", "sample_id"]

# start a list that we will collect each study's metadata after we add a sample_key and then what we choose and why
meta_parts = []
merge_log = []

# loop through metadata dictionary 
# sid = study name string; meta = dataframe for that study's metadata 
for sid, meta in meta_by_study.items():
    sid_str = str(sid).strip()
    meta.columns = [c.strip() for c in meta.columns]
    # add StudyID column to metadata - for creating sample_key and preventing cross-study collisions
    meta["StudyID"] = sid_str

    hmo_set = hmo_names_by_study.get(sid_str, set())

    # runs the scoring logic across all canidate columns and pulls the winning column name or None if nothing was usable
    choice = choose_best_id_column(meta, hmo_set, ID_CANDIDATES)
    chosen = choice["chosen"]

    merge_log.append({
        "StudyID": sid_str,
        "chosen_id_col": chosen,
        "scores": choice["scored"][:3],  # keep top 3 for readability
    })

    if chosen is None:
        print(f"[WARN] {sid_str}: No usable ID column found. Skipping this study for metadata merge.")
        continue

    # Build sample_key
    meta["_id_"] = meta[chosen].astype(str).str.strip()
    meta["sample_key"] = meta["StudyID"].astype(str).str.strip() + "__" + meta["_id_"]

    # IMPORTANT: ensure 1 row per sample_key
    dup_ct = meta.duplicated("sample_key").sum()
    if dup_ct > 0:
        print(f"[WARN] {sid_str}: {dup_ct} duplicate sample_key rows in metadata. Keeping first occurrence.")
        meta = meta.drop_duplicates("sample_key")

    meta_parts.append(meta.drop(columns=["_id_"], errors="ignore"))

metadata_master = pd.concat(meta_parts, ignore_index=True) if meta_parts else pd.DataFrame(columns=["sample_key"])

# ---------- 5) Build sample_key in HMO + left merge ----------
hmo["sample_key"] = hmo["StudyID"].astype(str).str.strip() + "__" + hmo["SampleName"].astype(str).str.strip()

hmo_with_meta = hmo.merge(
    metadata_master.drop(columns=["StudyID"], errors="ignore"),
    on="sample_key",
    how="left",
    suffixes=("", "_meta")
)

# ---------- 6) Sanity checks ----------
print("\n=== SANITY CHECKS ===")
print("HMO rows (before):", len(hmo))
print("HMO rows (after) :", len(hmo_with_meta))

# Per-study match rate: "did we attach *any* metadata column?"
meta_cols = [c for c in hmo_with_meta.columns if c not in hmo.columns]
if meta_cols:
    has_meta = hmo_with_meta[meta_cols].notna().any(axis=1)
    print("\nMetadata attached rate by StudyID:")
    print(hmo_with_meta.assign(has_meta=has_meta).groupby("StudyID")["has_meta"].mean().sort_values(ascending=False))
else:
    print("[WARN] No metadata columns were merged in.")

# ---------- 7) Write outputs ----------
Path("derived").mkdir(exist_ok=True)
metadata_master.to_csv("derived/metadata_master.csv", index=False)
hmo_with_meta.to_csv("derived/hmo_merged_with_metadata.csv", index=False)
pd.DataFrame(merge_log).to_json("derived/metadata_merge_log.json", orient="records", indent=2)

print("\nWrote:")
print(" - derived/metadata_master.csv")
print(" - derived/hmo_merged_with_metadata.csv")
print(" - derived/metadata_merge_log.json")



=== SANITY CHECKS ===
HMO rows (before): 1200
HMO rows (after) : 1200

Metadata attached rate by StudyID:
StudyID
Oxford        0.990385
Brooklyn      0.946154
DHM Pooled    0.000000
NeoBANK       0.000000
Name: has_meta, dtype: float64

Wrote:
 - derived/metadata_master.csv
 - derived/hmo_merged_with_metadata.csv
 - derived/metadata_merge_log.json
