In [2]:
# stenosis_label_local_v2.py
# ------------------------------------------------------------
# MRI 판독문에서 요추 협착증 여부를 규칙 기반으로 판정
# 입력: STEN_cohort_SMC_preprocess sheet2.xlsx (Sheet1, '검사결과' 열)
# 출력: stenosis_results_local.xlsx, stenosis_results_local.csv
# - 기존에 L1/2~L5/S1, need_check 열이 있으면 자동 삭제 후 결과를 붙임(중복 방지)
# ------------------------------------------------------------

import re
import pandas as pd

# ===== 0) 파일 경로/설정 =====
# 같은 폴더에 있으면 파일명만 작성
INPUT_XLSX  = "STEN_cohort_SMC_preprocess sheet2.xlsx"
SHEET_NAME  = "Sheet1"
TEXT_COL    = "검사결과"

OUT_XLSX    = "stenosis_results_local.xlsx"
OUT_CSV     = "stenosis_results_local.csv"

# ===== 1) 규칙 정의 =====
CUT_MARKER = "영상의학과 전공의 응급판독입니다"  # 이후 텍스트 무시

LEVEL_PATTERNS = {
    "L1/2":  r"\b(?:l1\s*[-_/~ ]\s*2|l12|l1-2|l1/2)\b",
    "L2/3":  r"\b(?:l2\s*[-_/~ ]\s*3|l23|l2-3|l2/3)\b",
    "L3/4":  r"\b(?:l3\s*[-_/~ ]\s*4|l34|l3-4|l3/4)\b",
    "L4/5":  r"\b(?:l4\s*[-_/~ ]\s*5|l45|l4-5|l4/5)\b",
    "L5/S1": r"\b(?:l5\s*[-_/~ ]\s*s1|l5s1|l5-s1|l5/s1)\b",
}

STENOSIS_CORE = r"(?:central(?:\s+canal)?\s+stenosis|neural\s+foraminal\s+stenosis|foraminal\s+stenosis|neural\s+foramen\s+(?:narrowing|stenosis)|lateral\s+recess\s+stenosis|subarticular(?:\s+recess)?\s+stenosis|협착)"
GLOBAL_STENOSIS_PHRASE = r"(?:lumbar\s+spinal\s+stenosis|협착)"

SEVERITY_TRUE  = r"(?:moderate|severe|degenerative|grade\s*(?:2|3)|중등도|중증|고도)"
SEVERITY_FALSE = r"(?:mild|grade\s*(?:0|1)|경도)"

DISC_ONLY       = r"(?:disc\s+(?:protrusion|extrusion|herniation|bulg(?:e|ing))|h(?:np))"
CAUSES_STENOSIS = r"(?:resulting\s+in|causing|with|동반한|유발|초래)\s+(?:central\s+canal\s+)?(?:stenosis|협착)"
NEGATION        = r"(?:no|without|absent|없음|없다)"

WINDOW = 140  # 레벨 주변 문맥 범위


def _precut(text: str) -> str:
    """응급판독 이후 내용 제거"""
    if not isinstance(text, str):
        return ""
    idx = text.find(CUT_MARKER)
    return text if idx < 0 else text[:idx]


def _norm(text: str) -> str:
    """소문자 변환 및 공백 정리"""
    t = text.lower()
    t = re.sub(r"\s+", " ", t)
    return t.strip()


def assess_window(win: str) -> str:
    """윈도우(레벨 주변 문맥) 판정"""
    if re.search(NEGATION + r".{0,20}(stenosis|협착)", win):
        return "false"
    if re.search(DISC_ONLY, win) and not re.search(CAUSES_STENOSIS, win):
        if not re.search(STENOSIS_CORE, win):
            return "false"
    if re.search(STENOSIS_CORE, win) and re.search(SEVERITY_FALSE, win):
        return "false"
    if re.search(STENOSIS_CORE, win) and re.search(SEVERITY_TRUE, win):
        return "true"
    if re.search(DISC_ONLY, win) and re.search(CAUSES_STENOSIS, win):
        return "true"
    if re.search(STENOSIS_CORE, win):
        return "none"
    return "none"


def classify_report(text: str) -> dict:
    """한 건의 판독문 분석"""
    t = _norm(_precut(text))
    per_level = {k: False for k in LEVEL_PATTERNS}
    level_seen = {k: False for k in LEVEL_PATTERNS}
    level_info = {k: False for k in LEVEL_PATTERNS}

    for lvl, pat in LEVEL_PATTERNS.items():
        for m in re.finditer(pat, t):
            level_seen[lvl] = True
            start = max(0, m.start() - WINDOW)
            end   = min(len(t), m.end() + WINDOW)
            win = t[start:end]
            verdict = assess_window(win)
            if verdict == "true":
                per_level[lvl] = True
                level_info[lvl] = True
            elif verdict == "false":
                level_info[lvl] = True

    global_stenosis = bool(re.search(GLOBAL_STENOSIS_PHRASE, t) or re.search(STENOSIS_CORE, t))
    has_true = any(per_level.values())
    any_level_mentioned = any(level_seen.values())
    any_level_info = any(level_info.values())

    need_check = False
    if global_stenosis and not has_true:
        if not any_level_mentioned or (any_level_mentioned and not any_level_info):
            need_check = True

    return {**per_level, "need_check": need_check}


def main():
    # 데이터 로드
    df = pd.read_excel(INPUT_XLSX, sheet_name=SHEET_NAME)
    if TEXT_COL not in df.columns:
        raise KeyError(f"'{TEXT_COL}' 컬럼이 없습니다. 실제 컬럼: {list(df.columns)}")

    # (핵심) 기존 결과 컬럼이 있으면 먼저 삭제 → 중복 방지
    result_cols = ["L1/2","L2/3","L3/4","L4/5","L5/S1","need_check"]
    df = df.drop(columns=[c for c in result_cols if c in df.columns], errors="ignore")

    # 분류 실행
    results = df[TEXT_COL].astype(str).apply(classify_report)
    res_df = pd.DataFrame(list(results))

    # 결과 병합
    final = pd.concat([df, res_df], axis=1)

    # 저장
    final.to_excel(OUT_XLSX, index=False)
    final.to_csv(OUT_CSV, index=False, encoding="utf-8-sig")

    # 요약(중복 없이 단일 결과 컬럼만 집계)
    counts = final[result_cols].sum(numeric_only=True)
    ratios = (counts / len(final)).round(4)

    print("\n=== 저장 완료(Local Rules) ===")
    print(f"Excel : {OUT_XLSX}")
    print(f"CSV   : {OUT_CSV}")
    print("\n=== 요약(개수) ===")
    print(counts.to_string())
    print("\n=== 요약(비율) ===")
    print(ratios.to_string())


if __name__ == "__main__":
    main()



=== 저장 완료(Local Rules) ===
Excel : stenosis_results_local.xlsx
CSV   : stenosis_results_local.csv

=== 요약(개수) ===
L1/2           213
L2/3           409
L3/4           590
L4/5           738
L5/S1          529
need_check    1019

=== 요약(비율) ===
L1/2          0.0507
L2/3          0.0974
L3/4          0.1405
L4/5          0.1758
L5/S1         0.1260
need_check    0.2427
