In [1]:
"""
medical_coding_tool.py

Deterministic mapping of clinical concepts -> ICD-10 and CPT/HCPCS codes.

Usage:
    from medical_coding_tool import map_clinical_concepts_to_codes

    result = map_clinical_concepts_to_codes(
        clinical_concepts=["hypertension", "knee arthroscopy"],
        coding_reference=coding_reference_dict,
    )

The function returns a JSON-serializable dict with keys `icd_codes` and `cpt_codes`,
where each is a list of {"description": ..., "code": ...} objects.

Design goals:
 - Deterministic: deterministic tie-breaking, fixed fuzzy threshold, stable sorting.
 - Three-step mapping: exact match (including aliases), rule-based regex, deterministic fuzzy fallback.
 - No LLM involved, so output will be pure JSON (no extraneous text).

The `coding_reference` should be a dict with this shape:
{
  "icd": [
    {"code": "I10", "description": "Essential (primary) hypertension", "aliases": ["hypertension", "high blood pressure"]},
    ...
  ],
  "cpt": [
    {"code": "29881", "description": "Arthroscopy, knee, surgical", "aliases": ["knee arthroscopy"]},
    ...
  ],
  "rules": [
    {"pattern": "\\bhypertensive\\b|\\bhypertension\\b", "icd": "I10"},
    {"pattern": "knee.*arthro", "cpt": "29881"},
    ...
  ]
}

This file includes a small example coding_reference and a __main__ block to demo behavior.
"""

from typing import List, Dict, Any, Tuple
import re
import json
from difflib import SequenceMatcher, get_close_matches

# ------------------------- Helpers -------------------------

def _normalize(text: str) -> str:
    """Lowercase and strip; collapse multiple spaces."""
    return re.sub(r"\s+", " ", (text or "").strip().lower())


def _similarity(a: str, b: str) -> float:
    return SequenceMatcher(None, a, b).ratio()


# ------------------------- Core mapper -------------------------

def map_clinical_concepts_to_codes(
    clinical_concepts: List[str],
    coding_reference: Dict[str, Any],
    fuzzy_threshold: float = 0.75,
    max_fuzzy_candidates: int = 3,
) -> Dict[str, List[Dict[str, str]]]:
    """
    Map a list of clinical_concepts to ICD-10 and CPT codes using deterministic rules.

    Parameters
    ----------
    clinical_concepts: list of free-text clinical phrases.
    coding_reference: dict containing 'icd', 'cpt', and optional 'rules'.
    fuzzy_threshold: minimum similarity for fuzzy fallback (0-1).
    max_fuzzy_candidates: how many fuzzy candidates to consider per concept.

    Returns
    -------
    dict with keys 'icd_codes' and 'cpt_codes' (each a list of {description, code}).
    """

    # normalize reference entries
    icd_index = []
    for e in coding_reference.get("icd", []):
        desc = e.get("description", "")
        aliases = e.get("aliases", []) or []
        names = [desc] + aliases
        icd_index.append({
            "code": e["code"],
            "description": desc,
            "names": [_normalize(n) for n in names],
        })

    cpt_index = []
    for e in coding_reference.get("cpt", []):
        desc = e.get("description", "")
        aliases = e.get("aliases", []) or []
        names = [desc] + aliases
        cpt_index.append({
            "code": e["code"],
            "description": desc,
            "names": [_normalize(n) for n in names],
        })

    # compile rule patterns
    rules = coding_reference.get("rules", []) or []
    compiled_rules = []
    for r in rules:
        try:
            compiled_rules.append({
                "pattern": re.compile(r.get("pattern", ""), flags=re.IGNORECASE),
                "icd": r.get("icd"),
                "cpt": r.get("cpt"),
            })
        except re.error:
            # ignore bad regex; deterministic behavior: skip
            continue

    # deterministic results containers (use dict to dedupe by code)
    icd_results: Dict[str, Dict[str, str]] = {}
    cpt_results: Dict[str, Dict[str, str]] = {}

    # helper to add result deterministically
    def _add_result(container: Dict[str, Dict[str, str]], code: str, description: str):
        if not code:
            return
        if code in container:
            return
        container[code] = {"code": code, "description": description}

    # For fuzzy matching, build flattened name->(code,description) lists
    def _build_name_map(index_list):
        name_map = []  # list of (name, code, description)
        for item in index_list:
            for name in item["names"]:
                name_map.append((name, item["code"], item["description"]))
        # sort for deterministic order
        name_map.sort(key=lambda x: (x[1], x[0]))
        return name_map

    icd_name_map = _build_name_map(icd_index)
    cpt_name_map = _build_name_map(cpt_index)

    # process each concept
    for raw in clinical_concepts:
        term = _normalize(raw)
        if not term:
            continue

        # 1) exact match against names
        for item in icd_index:
            if term in item["names"]:
                _add_result(icd_results, item["code"], item["description"])  # deterministic
        for item in cpt_index:
            if term in item["names"]:
                _add_result(cpt_results, item["code"], item["description"] )

        # 2) rule-based regex match
        for rule in compiled_rules:
            if rule["pattern"].search(raw):
                if rule.get("icd"):
                    # find description for that icd code in reference
                    desc = next((it["description"] for it in icd_index if it["code"] == rule["icd"]), rule["icd"])
                    _add_result(icd_results, rule["icd"], desc)
                if rule.get("cpt"):
                    desc = next((it["description"] for it in cpt_index if it["code"] == rule["cpt"]), rule["cpt"])
                    _add_result(cpt_results, rule["cpt"], desc)

        # 3) deterministic fuzzy fallback using SequenceMatcher
        # for ICD
        best_icd_candidates: List[Tuple[float, str, str]] = []  # (score, code, description)
        for name, code, desc in icd_name_map:
            score = _similarity(term, name)
            if score >= fuzzy_threshold:
                best_icd_candidates.append((score, code, desc))
        if best_icd_candidates:
            # choose highest score; deterministic tie-break by code then name
            best_icd_candidates.sort(key=lambda x: (-x[0], x[1]))
            chosen = best_icd_candidates[0]
            _add_result(icd_results, chosen[1], chosen[2])

        # for CPT
        best_cpt_candidates: List[Tuple[float, str, str]] = []
        for name, code, desc in cpt_name_map:
            score = _similarity(term, name)
            if score >= fuzzy_threshold:
                best_cpt_candidates.append((score, code, desc))
        if best_cpt_candidates:
            best_cpt_candidates.sort(key=lambda x: (-x[0], x[1]))
            chosen = best_cpt_candidates[0]
            _add_result(cpt_results, chosen[1], chosen[2])

    # deterministic ordering of results: sort by code string
    icd_list = sorted(list(icd_results.values()), key=lambda x: x["code"])
    cpt_list = sorted(list(cpt_results.values()), key=lambda x: x["code"])

    return {"icd_codes": icd_list, "cpt_codes": cpt_list}


# ------------------------- Minimal demo reference and CLI -------------------------

if __name__ == "__main__":
    # minimal example coding reference
    coding_reference_example = {
        "icd": [
            {"code": "I10", "description": "Essential (primary) hypertension", "aliases": ["hypertension", "high blood pressure"]},
            {"code": "E11.9", "description": "Type 2 diabetes mellitus without complications", "aliases": ["type 2 diabetes", "diabetes mellitus type 2", "dm type 2"]},
        ],
        "cpt": [
            {"code": "29881", "description": "Arthroscopy, knee, surgical", "aliases": ["knee arthroscopy", "arthroscopic knee procedure"]},
            {"code": "90471", "description": "Immunization administration", "aliases": ["vaccine administration"]},
        ],
        "rules": [
            {"pattern": "\\bhypertension\\b|\\bhigh blood pressure\\b", "icd": "I10"},
            {"pattern": "knee.*arthro", "cpt": "29881"},
        ]
    }

    examples = [
        "Patient with hypertension and type 2 diabetes",
        "Left knee arthroscopy performed",
        "Received vaccine - immunization admin",
    ]

    out = map_clinical_concepts_to_codes(examples, coding_reference_example)
    # print(json.dumps(out, indent=2, ensure_ascii=False))
    print(out)


{'icd_codes': [{'code': 'I10', 'description': 'Essential (primary) hypertension'}], 'cpt_codes': [{'code': '29881', 'description': 'Arthroscopy, knee, surgical'}]}
