In [None]:
# Step 0: Config
import os
from datetime import datetime
from pathlib import Path

# --- User-editable base path ---
SNOMED_BASE = Path("/PATH/TO/YOUR/SNOMED_BASE")  # e.g., "/Users/you/.../SnomedCT_files"

# --- Run directory ---
RUN_ID = datetime.now().strftime("%Y%m%d_%H%M%S")
PIPELINE_ROOT = SNOMED_BASE / "_outputs_llm_eval"
RUN_DIR = PIPELINE_ROOT / f"run_{RUN_ID}"

STEP1_DIR = RUN_DIR / "step1_llm_set1"
STEP2_DIR = RUN_DIR / "step2_llm_set2"
STEP3_DIR = RUN_DIR / "step3_snomed_ground_truth"
STEP4_DIR = RUN_DIR / "step4_comparison"

# Output files
STEP1_OUT = STEP1_DIR / "set1_llm_output.csv"
STEP2_OUT = STEP2_DIR / "set2_llm_output.csv"
STEP3_OUT = STEP3_DIR / "snomed_ground_truth.csv"

STEP4_OUT_MAIN = STEP4_DIR / "comparison_results.csv"
STEP4_OUT_LONG = STEP4_DIR / "comparison_results_long.csv"
STEP4_OUT_WIDE = STEP4_DIR / "comparison_results_wide.csv"
STEP4_OUT_TSV = STEP4_DIR / "comparison_results.tsv"

# Create directories
for d in [STEP1_DIR, STEP2_DIR, STEP3_DIR, STEP4_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Concept terms (replace with your level 3/4 concepts)
CONCEPT_TERMS = [
    "Hypertensive disorder",
    "Type 2 diabetes mellitus",
    "Asthma",
]

print("Run directory:", RUN_DIR)


In [None]:
# Step 1: LLM querying - Set 1
import json
import os
import re
import time
from datetime import datetime
from pathlib import Path

import pandas as pd
from openai import OpenAI

assert STEP1_DIR.exists(), "Run the Config cell first (STEP1_DIR missing)."

LOG_PATH = STEP1_DIR / "logs.txt"

MODEL_NAME = "gpt-4o-mini"  # Change if needed
PROMPT_SET = "set1"

PROMPT_TEMPLATE = """
You are a SNOMED taxonomy assistant. For the concept term: "{concept_term}", return STRICT JSON only:
{{"sons": [...], "cousins": [...]}}

Definitions:
- "sons": direct is-a children of the concept.
- "cousins": children of siblings / nearby lateral concepts.

No explanations. JSON only.
""".strip()


def normalize_term(term: str) -> str:
    return re.sub(r"\s+", " ", term.strip().lower())


def safe_json_extract(text: str) -> dict:
    # Extract the first JSON object found in the response
    match = re.search(r"\{.*\}", text, flags=re.DOTALL)
    if not match:
        raise ValueError("No JSON object found in response.")
    json_str = match.group(0)
    return json.loads(json_str)


def list_to_pipe(items):
    if not items:
        return ""
    return "|".join([str(i).replace("|", " ") for i in items])


client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
if not client.api_key:
    raise EnvironmentError("OPENAI_API_KEY is not set in environment.")

# Load existing output for resumability
if STEP1_OUT.exists():
    existing_df = pd.read_csv(STEP1_OUT, dtype=str).fillna("")
    done_terms = set(existing_df["concept_term"].astype(str).tolist())
else:
    existing_df = pd.DataFrame()
    done_terms = set()

rows = []

for concept_term in CONCEPT_TERMS:
    if concept_term in done_terms:
        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tSKIP (already processed)\n")
        continue

    prompt = PROMPT_TEMPLATE.format(concept_term=concept_term)
    try:
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
        )
        raw_output = response.choices[0].message.content or ""
        parsed = safe_json_extract(raw_output)
        sons = parsed.get("sons", [])
        cousins = parsed.get("cousins", [])

        row = {
            "timestamp": datetime.now().isoformat(),
            "model": MODEL_NAME,
            "prompt_set": PROMPT_SET,
            "concept_term": concept_term,
            "raw_output": raw_output.replace("\n", " ").strip(),
            "extracted_sons": list_to_pipe(sons),
            "extracted_cousins": list_to_pipe(cousins),
        }
        rows.append(row)

        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tOK\n")

    except Exception as e:
        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tERROR\t{e}\n")

    time.sleep(0.2)

# Append results
if rows:
    out_df = pd.DataFrame(rows)
    if STEP1_OUT.exists():
        combined = pd.concat([existing_df, out_df], ignore_index=True)
    else:
        combined = out_df
    combined.to_csv(STEP1_OUT, index=False)

print("Step 1 complete. Output:", STEP1_OUT)


In [None]:
# Step 2: LLM querying - Set 2
import json
import os
import re
import time
from datetime import datetime
from pathlib import Path

import pandas as pd
from openai import OpenAI

assert STEP2_DIR.exists(), "Run the Config cell first (STEP2_DIR missing)."

LOG_PATH = STEP2_DIR / "logs.txt"

MODEL_NAME = "gpt-4o-mini"  # Change if needed
PROMPT_SET = "set2"

PROMPT_TEMPLATE = """
You are a SNOMED taxonomy assistant. For the concept term: "{concept_term}", return STRICT JSON only:
{{"sons": [...], "cousins": [...]}}

Constraints:
- Up to 30 sons and 30 cousins.
- Prefer SNOMED-like clinically common terms.

Definitions:
- "sons": direct is-a children of the concept.
- "cousins": children of siblings / nearby lateral concepts.

No explanations. JSON only.
""".strip()


def normalize_term(term: str) -> str:
    return re.sub(r"\s+", " ", term.strip().lower())


def safe_json_extract(text: str) -> dict:
    # Extract the first JSON object found in the response
    match = re.search(r"\{.*\}", text, flags=re.DOTALL)
    if not match:
        raise ValueError("No JSON object found in response.")
    json_str = match.group(0)
    return json.loads(json_str)


def list_to_pipe(items):
    if not items:
        return ""
    return "|".join([str(i).replace("|", " ") for i in items])


client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
if not client.api_key:
    raise EnvironmentError("OPENAI_API_KEY is not set in environment.")

# Load existing output for resumability
if STEP2_OUT.exists():
    existing_df = pd.read_csv(STEP2_OUT, dtype=str).fillna("")
    done_terms = set(existing_df["concept_term"].astype(str).tolist())
else:
    existing_df = pd.DataFrame()
    done_terms = set()

rows = []

for concept_term in CONCEPT_TERMS:
    if concept_term in done_terms:
        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tSKIP (already processed)\n")
        continue

    prompt = PROMPT_TEMPLATE.format(concept_term=concept_term)
    try:
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
        )
        raw_output = response.choices[0].message.content or ""
        parsed = safe_json_extract(raw_output)
        sons = parsed.get("sons", [])
        cousins = parsed.get("cousins", [])

        row = {
            "timestamp": datetime.now().isoformat(),
            "model": MODEL_NAME,
            "prompt_set": PROMPT_SET,
            "concept_term": concept_term,
            "raw_output": raw_output.replace("\n", " ").strip(),
            "extracted_sons": list_to_pipe(sons),
            "extracted_cousins": list_to_pipe(cousins),
        }
        rows.append(row)

        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tOK\n")

    except Exception as e:
        with LOG_PATH.open("a") as logf:
            logf.write(f"{datetime.now().isoformat()}\t{concept_term}\tERROR\t{e}\n")

    time.sleep(0.2)

# Append results
if rows:
    out_df = pd.DataFrame(rows)
    if STEP2_OUT.exists():
        combined = pd.concat([existing_df, out_df], ignore_index=True)
    else:
        combined = out_df
    combined.to_csv(STEP2_OUT, index=False)

print("Step 2 complete. Output:", STEP2_OUT)


In [None]:
# Step 3: Build SNOMED ground truth
import csv
import re
from collections import defaultdict, deque
from pathlib import Path

import pandas as pd

# Input RF2 paths (US Edition snapshot)
DESC_PATH = SNOMED_BASE / "Snapshot" / "Terminology" / "sct2_Description_Snapshot-en_US1000124_20250901.txt"
REL_PATH = SNOMED_BASE / "Snapshot" / "Terminology" / "sct2_Relationship_Snapshot_US1000124_20250901.txt"  # inferred

assert DESC_PATH.exists(), f"Missing description file: {DESC_PATH}"
assert REL_PATH.exists(), f"Missing relationship file: {REL_PATH}"
assert STEP3_DIR.exists(), "Run the Config cell first (STEP3_DIR missing)."

ROOT_CONCEPT_ID = "138875005"  # SNOMED CT root concept


def normalize_term(term: str) -> str:
    term = term.strip().lower()
    term = re.sub(r"\s+", " ", term)
    return term


# --- Load descriptions ---
# RF2 description fields: id, effectiveTime, active, moduleId, conceptId, languageCode, typeId, term, caseSignificanceId
# We'll use active==1

fsn_by_concept = {}
terms_by_concept = defaultdict(list)
conceptids_by_term = defaultdict(set)

with DESC_PATH.open("r", encoding="utf-8") as f:
    header = f.readline().rstrip("\n").split("\t")
    idx = {name: i for i, name in enumerate(header)}

    for line in f:
        parts = line.rstrip("\n").split("\t")
        if parts[idx["active"]] != "1":
            continue
        concept_id = parts[idx["conceptId"]]
        term = parts[idx["term"]]
        type_id = parts[idx["typeId"]]  # FSN if 900000000000003001

        if type_id == "900000000000003001":
            fsn_by_concept[concept_id] = term

        terms_by_concept[concept_id].append(term)
        conceptids_by_term[normalize_term(term)].add(concept_id)


# --- Load inferred is-a relationships ---
# RF2 relationship fields: id, effectiveTime, active, moduleId, sourceId, destinationId, relationshipGroup,
# typeId, characteristicTypeId, modifierId

IS_A_TYPE_ID = "116680003"

parents_by_child = defaultdict(set)
children_by_parent = defaultdict(set)

with REL_PATH.open("r", encoding="utf-8") as f:
    header = f.readline().rstrip("\n").split("\t")
    idx = {name: i for i, name in enumerate(header)}

    for line in f:
        parts = line.rstrip("\n").split("\t")
        if parts[idx["active"]] != "1":
            continue
        if parts[idx["typeId"]] != IS_A_TYPE_ID:
            continue

        child_id = parts[idx["sourceId"]]
        parent_id = parts[idx["destinationId"]]

        parents_by_child[child_id].add(parent_id)
        children_by_parent[parent_id].add(child_id)


# --- Compute depth from root ---
# BFS from ROOT_CONCEPT_ID

depth_by_concept = {}
queue = deque([(ROOT_CONCEPT_ID, 0)])
visited = set([ROOT_CONCEPT_ID])

while queue:
    cid, d = queue.popleft()
    depth_by_concept[cid] = d
    for child in children_by_parent.get(cid, []):
        if child not in visited:
            visited.add(child)
            queue.append((child, d + 1))


# --- Resolve concept term to best conceptId ---
def resolve_concept_id(concept_term: str):
    candidates = list(conceptids_by_term.get(normalize_term(concept_term), []))
    if not candidates:
        return None, "No match in term index"

    # Prefer concepts with depth computed and FSN available, then smaller depth
    def score(cid):
        has_depth = cid in depth_by_concept
        has_fsn = cid in fsn_by_concept
        depth = depth_by_concept.get(cid, 10**9)
        return (0 if has_depth else 1, 0 if has_fsn else 1, depth)

    candidates.sort(key=score)
    return candidates[0], None


def ids_to_terms(ids):
    terms = []
    for cid in ids:
        terms.append(fsn_by_concept.get(cid, terms_by_concept.get(cid, [cid])[0] if terms_by_concept.get(cid) else cid))
    return terms


rows = []
for concept_term in CONCEPT_TERMS:
    concept_id, note = resolve_concept_id(concept_term)
    if not concept_id:
        rows.append({
            "concept_term": concept_term,
            "snomed_id": "",
            "fsn": "",
            "depth": "",
            "sons_ids": "",
            "sons_terms": "",
            "sons_count": 0,
            "cousins_ids": "",
            "cousins_terms": "",
            "cousins_count": 0,
            "note": note,
        })
        continue

    sons_ids = sorted(children_by_parent.get(concept_id, []))

    # Siblings: other children of each parent
    siblings = set()
    for parent in parents_by_child.get(concept_id, []):
        siblings.update(children_by_parent.get(parent, []))
    siblings.discard(concept_id)

    cousins_ids = set()
    for sibling_id in siblings:
        cousins_ids.update(children_by_parent.get(sibling_id, []))

    sons_terms = ids_to_terms(sons_ids)
    cousins_terms = ids_to_terms(sorted(cousins_ids))

    rows.append({
        "concept_term": concept_term,
        "snomed_id": concept_id,
        "fsn": fsn_by_concept.get(concept_id, ""),
        "depth": depth_by_concept.get(concept_id, ""),
        "sons_ids": "|".join([str(x) for x in sons_ids]),
        "sons_terms": "|".join([t.replace("|", " ") for t in sons_terms]),
        "sons_count": len(sons_ids),
        "cousins_ids": "|".join([str(x) for x in sorted(cousins_ids)]),
        "cousins_terms": "|".join([t.replace("|", " ") for t in cousins_terms]),
        "cousins_count": len(cousins_ids),
        "note": note or "",
    })

out_df = pd.DataFrame(rows)
out_df.to_csv(STEP3_OUT, index=False)

print("Step 3 complete. Output:", STEP3_OUT)


In [None]:
# Step 4: Compare LLM outputs vs SNOMED
import re
from pathlib import Path

import pandas as pd

assert STEP1_OUT.exists(), f"Missing Step 1 output: {STEP1_OUT}"
assert STEP2_OUT.exists(), f"Missing Step 2 output: {STEP2_OUT}"
assert STEP3_OUT.exists(), f"Missing Step 3 output: {STEP3_OUT}"


def normalize_term(term: str) -> str:
    term = term.strip().lower()
    term = re.sub(r"\s+", " ", term)
    return term


def split_pipe(s: str):
    if not isinstance(s, str) or s.strip() == "":
        return []
    return [x.strip() for x in s.split("|") if x.strip()]


def list_to_pipe(items):
    if not items:
        return ""
    return "|".join([str(i).replace("|", " ") for i in items])


# Load inputs
set1 = pd.read_csv(STEP1_OUT, dtype=str).fillna("")
set2 = pd.read_csv(STEP2_OUT, dtype=str).fillna("")
truth = pd.read_csv(STEP3_OUT, dtype=str).fillna("")

truth_map = {row["concept_term"]: row for _, row in truth.iterrows()}


def compare_row(prompt_set: str, row: pd.Series, truth_row: pd.Series):
    llm_sons = split_pipe(row.get("extracted_sons", ""))
    llm_cousins = split_pipe(row.get("extracted_cousins", ""))

    snomed_sons = split_pipe(truth_row.get("sons_terms", ""))
    snomed_cousins = split_pipe(truth_row.get("cousins_terms", ""))

    # Normalize for matching
    llm_sons_norm = {normalize_term(x) for x in llm_sons}
    llm_cousins_norm = {normalize_term(x) for x in llm_cousins}
    snomed_sons_norm = {normalize_term(x) for x in snomed_sons}
    snomed_cousins_norm = {normalize_term(x) for x in snomed_cousins}

    sons_intersection = sorted(snomed_sons_norm & llm_sons_norm)
    sons_missed = sorted(snomed_sons_norm - llm_sons_norm)
    sons_extra = sorted(llm_sons_norm - snomed_sons_norm)

    cousins_intersection = sorted(snomed_cousins_norm & llm_cousins_norm)
    cousins_missed = sorted(snomed_cousins_norm - llm_cousins_norm)
    cousins_extra = sorted(llm_cousins_norm - snomed_cousins_norm)

    def safe_div(a, b):
        return (a / b) if b else 0.0

    sons_correct = len(sons_intersection)
    sons_count = len(snomed_sons_norm)
    sons_llm_count = len(llm_sons_norm)

    cousins_correct = len(cousins_intersection)
    cousins_count = len(snomed_cousins_norm)
    cousins_llm_count = len(llm_cousins_norm)

    return {
        "prompt_set": prompt_set,
        "concept_term": row["concept_term"],
        "llm_sons_raw": list_to_pipe(llm_sons),
        "llm_cousins_raw": list_to_pipe(llm_cousins),
        "snomed_sons_terms": list_to_pipe(snomed_sons),
        "snomed_cousins_terms": list_to_pipe(snomed_cousins),
        "sons_intersection": list_to_pipe(sons_intersection),
        "sons_missed": list_to_pipe(sons_missed),
        "sons_extra": list_to_pipe(sons_extra),
        "cousins_intersection": list_to_pipe(cousins_intersection),
        "cousins_missed": list_to_pipe(cousins_missed),
        "cousins_extra": list_to_pipe(cousins_extra),
        "snomed_sons_count": sons_count,
        "llm_sons_count": sons_llm_count,
        "sons_correct": sons_correct,
        "sons_missed_count": len(sons_missed),
        "sons_extra_count": len(sons_extra),
        "sons_recall": safe_div(sons_correct, sons_count),
        "sons_precision": safe_div(sons_correct, sons_llm_count),
        "snomed_cousins_count": cousins_count,
        "llm_cousins_count": cousins_llm_count,
        "cousins_correct": cousins_correct,
        "cousins_missed_count": len(cousins_missed),
        "cousins_extra_count": len(cousins_extra),
        "cousins_recall": safe_div(cousins_correct, cousins_count),
        "cousins_precision": safe_div(cousins_correct, cousins_llm_count),
        "missing_in_llm_csv": False,
    }


results = []

# Process both prompt sets
for df, prompt_set in [(set1, "set1"), (set2, "set2")]:
    for _, row in df.iterrows():
        concept_term = row["concept_term"]
        truth_row = truth_map.get(concept_term)
        if truth_row is None:
            results.append({
                "prompt_set": prompt_set,
                "concept_term": concept_term,
                "missing_in_llm_csv": True,
            })
            continue
        results.append(compare_row(prompt_set, row, truth_row))

# Add missing rows for any concept_terms not in LLM outputs
for concept_term in CONCEPT_TERMS:
    for prompt_set in ["set1", "set2"]:
        if not any(r.get("prompt_set") == prompt_set and r.get("concept_term") == concept_term for r in results):
            results.append({
                "prompt_set": prompt_set,
                "concept_term": concept_term,
                "missing_in_llm_csv": True,
            })

comparison_df = pd.DataFrame(results).fillna("")
comparison_df.to_csv(STEP4_OUT_MAIN, index=False)
comparison_df.to_csv(STEP4_OUT_TSV, index=False, sep="\t")

# --- Long (tidy) format ---
long_rows = []
for _, row in comparison_df.iterrows():
    if row.get("missing_in_llm_csv") in [True, "True", "true", "1"]:
        continue

    for relation in ["sons", "cousins"]:
        for status in ["intersection", "missed", "extra"]:
            col = f"{relation}_{status}"
            items = split_pipe(row.get(col, ""))
            for item in items:
                long_rows.append({
                    "prompt_set": row["prompt_set"],
                    "concept_term": row["concept_term"],
                    "relation": relation,
                    "status": "correct" if status == "intersection" else status,
                    "item_term": item,
                    "snomed_id": "",
                    "depth": "",
                })

long_df = pd.DataFrame(long_rows)
long_df.to_csv(STEP4_OUT_LONG, index=False)

# --- Wide (Excel-friendly) format ---
wide_rows = []
for _, row in comparison_df.iterrows():
    wide_row = row.to_dict()
    for col in [
        "sons_intersection", "sons_missed", "sons_extra",
        "cousins_intersection", "cousins_missed", "cousins_extra",
    ]:
        items = split_pipe(row.get(col, ""))
        for i, item in enumerate(items, start=1):
            wide_row[f"{col}_{i}"] = item
    wide_rows.append(wide_row)

wide_df = pd.DataFrame(wide_rows)
wide_df.to_csv(STEP4_OUT_WIDE, index=False)

print("Step 4 complete. Outputs:")
print("-", STEP4_OUT_MAIN)
print("-", STEP4_OUT_LONG)
print("-", STEP4_OUT_WIDE)
print("-", STEP4_OUT_TSV)
