In [1]:
!pip install -q transformers torch pandas tqdm

In [2]:
# Cell 2 — NER processing script (includes parser that recognizes
# headers like "CASE 01 — Title" followed by optional underline)
import os
import re
import json
from typing import List, Dict, Any, Iterable
from dataclasses import dataclass
import pandas as pd
from tqdm import tqdm
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

# ----------------------------
# File parsing helpers
# ----------------------------
def read_text(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

def load_cases_from_txt(path: str) -> List[Dict[str, Any]]:
    """
    Parse a .txt file for headers of the form:
      CASE 01 — Title
      =================================
    or
      CASE 2 - Title
    Returns a list of case dicts {"case_id", "title", "text"}.
    If no headers found, returns the whole file as a single case.
    """
    text = read_text(path)
    # Match "CASE 01 — Title" optionally followed by a line of '=' chars
    header_pat = re.compile(
        r'(?im)^\s*CASE\s*0*(\d+)\s*(?:[-—–]\s*)?(.*?)\s*(?:\r?\n[=]+\s*)?',
        re.MULTILINE
    )
    matches = list(header_pat.finditer(text))

    cases: List[Dict[str, Any]] = []
    if matches:
        for i, m in enumerate(matches):
            case_num = int(m.group(1))
            title = (m.group(2) or "").strip()
            start = m.end()
            end = matches[i + 1].start() if (i + 1) < len(matches) else len(text)
            body = text[start:end].strip()
            cases.append({
                "case_id": case_num,
                "title": title or f"Case {case_num}",
                "text": body
            })
    else:
        # Fallback: whole file as one case
        cases.append({"case_id": 1, "title": os.path.basename(path), "text": text.strip()})
    return cases

def flatten_text(obj: Any) -> str:
    """Flatten JSON-like object into a single whitespace-separated string."""
    if obj is None:
        return ""
    if isinstance(obj, dict):
        return " ".join(flatten_text(v) for v in obj.values() if v is not None)
    if isinstance(obj, list):
        return " ".join(flatten_text(v) for v in obj)
    if isinstance(obj, (int, float, bool)):
        return str(obj)
    if isinstance(obj, str):
        return obj
    return ""

def load_cases_from_json(path: str) -> List[Dict[str, Any]]:
    """If JSON root is a list, each item becomes a case; otherwise whole file is one case."""
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        data = json.load(f)

    cases: List[Dict[str, Any]] = []
    if isinstance(data, list):
        for idx, item in enumerate(data, start=1):
            title = None
            if isinstance(item, dict):
                for key in ("title", "case", "name", "id"):
                    if key in item and isinstance(item[key], str):
                        title = item[key]
                        break
            text = flatten_text(item)
            cases.append({"case_id": idx, "title": title or f"json_item_{idx}", "text": text})
    else:
        text = flatten_text(data)
        cases.append({"case_id": 1, "title": os.path.basename(path), "text": text})
    return cases

def discover_inputs(inputs: List[str]) -> List[str]:
    """Return sorted list of .txt or .json file paths found in the provided paths (files or folders)."""
    files: List[str] = []
    for p in inputs:
        if os.path.isdir(p):
            for root, _, fnames in os.walk(p):
                for fn in fnames:
                    if fn.lower().endswith((".txt", ".json")):
                        files.append(os.path.join(root, fn))
        else:
            if p.lower().endswith((".txt", ".json")) and os.path.exists(p):
                files.append(p)
    return sorted(files)

# ----------------------------
# NER helpers
# ----------------------------
def chunk_text(text: str, max_chars: int = 3000, overlap: int = 200) -> Iterable[str]:
    """Chunk long text with overlap to avoid token truncation at chunk boundaries."""
    if len(text) <= max_chars:
        yield text
        return
    start = 0
    n = len(text)
    while start < n:
        end = min(n, start + max_chars)
        yield text[start:end]
        if end >= n:
            break
        start = end - overlap

def build_ner_pipeline(model_name: str = "d4data/biomedical-ner-all"):
    """Build a Hugging Face pipeline for token-classification/NER. Uses GPU if available."""
    device = 0 if torch.cuda.is_available() else -1
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)
    # aggregation_strategy arg may not be available in older transformers versions
    try:
        nlp = pipeline(
            "ner",
            model=model,
            tokenizer=tokenizer,
            aggregation_strategy="simple",
            device=device,
        )
    except TypeError:
        nlp = pipeline(
            "ner",
            model=model,
            tokenizer=tokenizer,
            device=device,
        )
    return nlp

def clean_entity_token(s: str) -> str:
    s = s.strip().lower()
    s = re.sub(r"[\s]+", " ", s)
    s = s.strip(' ,.;:()[]{}"\'')
    return s

def filter_entities(ents: List[str]) -> List[str]:
    """Post-filter: remove junk tokens and very short tokens unless in keep_short."""
    keep_short = {"amd", "cme", "rop", "ptosis"}
    out: List[str] = []
    junk = {"left", "right", "week", "weeks", "time", "eye", "central", "loss", "gain", "solution", "agents"}
    for e in ents:
        e = e.strip()
        if not e:
            continue
        if e in junk:
            continue
        if len(e) < 3 and e not in keep_short:
            continue
        out.append(e)
    return sorted(set(out))

def run_ner_on_text(nlp, text: str) -> List[str]:
    entities: List[str] = []
    for chunk in chunk_text(text):
        try:
            results = nlp(chunk)
        except Exception:
            continue
        for r in results:
            # Depending on transformers version keys differ: "word", "entity_group", "entity"
            word = clean_entity_token(r.get("word", "") or r.get("entity_group", "") or r.get("entity", ""))
            if word:
                entities.append(word)
    entities = filter_entities(entities)
    return entities

# ----------------------------
# Main runner
# ----------------------------
@dataclass
class CaseRecord:
    source_file: str
    case_id: int
    title: str
    text: str
    entities: List[str]

def process_files(input_paths: List[str], model_name: str = "d4data/biomedical-ner-all") -> pd.DataFrame:
    files = discover_inputs(input_paths)
    if not files:
        raise FileNotFoundError("No .txt or .json files found in provided inputs.")
    nlp = build_ner_pipeline(model_name=model_name)

    rows: List[CaseRecord] = []
    for path in files:
        if path.lower().endswith(".txt"):
            cases = load_cases_from_txt(path)
        else:
            cases = load_cases_from_json(path)

        for c in tqdm(cases, desc=f"NER {os.path.basename(path)}", leave=False):
            ents = run_ner_on_text(nlp, c["text"])
            rows.append(
                CaseRecord(
                    source_file=os.path.abspath(path),
                    case_id=c["case_id"],
                    title=c.get("title") or "",
                    text=c["text"],
                    entities=ents,
                )
            )

    df = pd.DataFrame(
        [
            {
                "source_file": r.source_file,
                "case_id": r.case_id,
                "title": r.title,
                "num_chars": len(r.text),
                "entities": ", ".join(r.entities),
                "unique_entity_count": len(r.entities),
            }
            for r in rows
        ]
    )
    return df

def run_and_save(input_paths: List[str], output_csv: str = "ner_extracted_results.csv", model_name: str = "d4data/biomedical-ner-all"):
    df = process_files(input_paths, model_name=model_name)
    df.to_csv(output_csv, index=False)
    print(f"Saved NER results to {output_csv} (rows: {len(df)})")
    return df

In [None]:
# Cell: Process local note files for NER
import os

# Directory containing note .txt files
NOTES_DIR = '../data'

# Collect all .txt files in the directory
input_paths = [os.path.join(NOTES_DIR, f) for f in os.listdir(NOTES_DIR) if f.lower().endswith('.txt')]
print(f'Found {len(input_paths)} note files')

# Run the NER pipeline and save results to the results directory
output_csv = '../results/local_ner_results.csv'

# Ensure results directory exists
os.makedirs('../results', exist_ok=True)

# Process files and save
df = run_and_save(input_paths, output_csv=output_csv)

# Display the first few rows
df.head()
