# ICU Clinical Handoff Summary Pipeline

EMR 데이터 로드 → 소견 추출 → 해석 (중복 제거/충돌 해소) → 검증 → 인수인계 요약 생성

## 1. Setup

프로젝트 루트에서 노트북을 실행하세요. 인터넷 접속이 불가한 환경에서는 사전에 의존성을 설치해야 합니다.

In [None]:
import sys, os
from pathlib import Path

_nb_dir = Path(globals().get("_dh", ["."])[0]).resolve()
REPO_ROOT = _nb_dir.parent if _nb_dir.name == "notebooks" else _nb_dir
assert (REPO_ROOT / "src").exists(), f"src/ not found in {REPO_ROOT}. 프로젝트 루트를 확인하세요."

os.chdir(REPO_ROOT)
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))
print(f"Working directory: {REPO_ROOT}")

import json
import pandas as pd
from IPython.display import Markdown, display
from src.config import (
    BASELINE_FILE, INPUT_DATA_FILE, OUTPUT_DIR, EXTRACTIONS_DIR,
    OUTPUT_FRAMEWORK, OUTPUT_SECTIONS, SHEET_NAMES, SHEET_NAME_TO_PROMPT,
)
from src.loader import load_emr_file
from src.extractors import extract_all, extract_sheet
from src.interpreter import interpret
from src.validator import validate
from src.generator import generate
from src.schemas import ExtractorOutput
print("All modules loaded.")

In [None]:
hf_token = os.environ.get("HF_TOKEN", "")
if hf_token:
    from huggingface_hub import login
    login(token=hf_token)
    print("Logged in to Hugging Face")
else:
    print("HF_TOKEN not set, skipping login")

## 2. Load EMR Data

아래 셀을 실행하면 파일 업로드 위젯이 나타납니다.
2개의 Excel 파일을 선택한 뒤, 다음 셀을 실행하세요:
- **Baseline** — 환자 기본 정보
- **Data** — EMR 데이터

In [None]:
import src.config as _cfg

DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)

try:
    import ipywidgets as widgets
    from IPython.display import display as ipy_display
    baseline_uploader = widgets.FileUpload(accept=".xlsx", multiple=False, description="Baseline")
    data_uploader = widgets.FileUpload(accept=".xlsx", multiple=False, description="Data")
    print("1/2) Input_Baseline 파일을 선택하세요 (환자 기본 정보):")
    ipy_display(baseline_uploader)
    print("2/2) Input_Data 파일을 선택하세요 (EMR 데이터):")
    ipy_display(data_uploader)
    _USE_WIDGET = True
except ImportError:
    print("ipywidgets 미설치 — 다음 셀에서 파일 경로를 직접 입력합니다.")
    _USE_WIDGET = False

In [None]:
if _USE_WIDGET:
    assert len(baseline_uploader.value) > 0, "Baseline 파일을 먼저 업로드하세요"
    assert len(data_uploader.value) > 0, "Data 파일을 먼저 업로드하세요"
    for item in baseline_uploader.value:
        path = DATA_DIR / item.name
        path.write_bytes(item.content.tobytes())
        _cfg.BASELINE_FILE = path
    for item in data_uploader.value:
        path = DATA_DIR / item.name
        path.write_bytes(item.content.tobytes())
        _cfg.INPUT_DATA_FILE = path
else:
    baseline_path = input("Baseline 파일 경로: ").strip()
    data_path = input("Data 파일 경로: ").strip()
    _cfg.BASELINE_FILE = Path(baseline_path)
    _cfg.INPUT_DATA_FILE = Path(data_path)

baseline_sheets = load_emr_file(_cfg.BASELINE_FILE)
data_sheets = load_emr_file(_cfg.INPUT_DATA_FILE)

print(f"\nBaseline sheets: {list(baseline_sheets.keys())}")
print(f"Data sheets: {list(data_sheets.keys())}")
for name, df in data_sheets.items():
    print(f"  {name}: {len(df)} rows")

## 3. Run Extractors

In [None]:
extractor_outputs = await extract_all(data_sheets)

print(f"Extracted from {len(extractor_outputs)} sheets:")
for eo in extractor_outputs:
    print(f"  {eo.sheet_name}: {len(eo.findings)} findings")

In [None]:
EXTRACTIONS_DIR.mkdir(parents=True, exist_ok=True)
for eo in extractor_outputs:
    out_path = EXTRACTIONS_DIR / f"{eo.sheet_name.replace(' ', '_').lower()}.json"
    out_path.write_text(json.dumps(eo.model_dump(), ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"Saved: {out_path.name}")

## 4. Extraction Inspection

In [None]:
results = {eo.sheet_name: eo for eo in extractor_outputs}

for sheet_name, eo in results.items():
    source_rows = len(data_sheets[sheet_name])
    findings_count = len(eo.findings)
    header = (
        f"### {sheet_name}\n\n"
        f"**Source rows:** {source_rows} | "
        f"**Findings extracted:** {findings_count}\n\n"
    )
    rows = []
    for i, f in enumerate(eo.findings, 1):
        rows.append(f"| {i} | {f.datetime or '—'} | {f.category} | {f.content[:120]}{'…' if len(f.content) > 120 else ''} |")
    table = "| # | Datetime | Category | Content |\n|---|----------|----------|---------|\n" + "\n".join(rows)
    display(Markdown(header + table))
    display(Markdown("---"))

### Coverage Analysis

In [None]:
coverage_rows = []
for sheet_name in SHEET_NAMES:
    source_rows = len(data_sheets.get(sheet_name, pd.DataFrame()))
    findings = len(results[sheet_name].findings) if sheet_name in results else 0
    pct = (findings / source_rows * 100) if source_rows > 0 else 0.0
    coverage_rows.append({"Sheet": sheet_name, "Source Rows": source_rows, "Findings Extracted": findings, "Coverage %": round(pct, 1)})

coverage_df = pd.DataFrame(coverage_rows)
display(coverage_df)

## 5. Run Interpreter

In [None]:
interpreter_output = await interpret(extractor_outputs, baseline_sheets)

print(f"Reconciled findings: {len(interpreter_output.reconciled_findings)}")
print(f"Conflicts resolved:  {len(interpreter_output.conflicts_resolved)}")
print(f"Duplicates removed:  {interpreter_output.duplicates_removed}")
print(f"Input findings:      {interpreter_output.metadata.total_input_findings}")
print(f"Output findings:     {interpreter_output.metadata.total_output_findings}")

In [None]:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
interp_path = OUTPUT_DIR / "interpretation.json"
interp_path.write_text(
    json.dumps(interpreter_output.model_dump(), ensure_ascii=False, indent=2),
    encoding="utf-8",
)
print(f"Saved: {interp_path}")

## 6. Interpretation Inspection

In [None]:
rows = []
for i, rf in enumerate(interpreter_output.reconciled_findings, 1):
    sources = ", ".join(rf.sources)
    note = rf.resolution_note or "—"
    content_preview = rf.content[:100] + ("…" if len(rf.content) > 100 else "")
    rows.append(f"| {i} | {rf.datetime or '—'} | {content_preview} | {sources} | {note} |")

table = (
    "| # | Datetime | Content | Sources | Resolution Note |\n"
    "|---|----------|---------|---------|--------------------|\n"
    + "\n".join(rows)
)
display(Markdown(table))

### Conflict Resolutions

In [None]:
if not interpreter_output.conflicts_resolved:
    display(Markdown("*No conflicts detected.*"))
else:
    for i, cr in enumerate(interpreter_output.conflicts_resolved, 1):
        md = (
            f"### Conflict {i}\n\n"
            f"**Description:** {cr.description}\n\n"
            f"**Sources:** {', '.join(cr.sources)}\n\n"
            f"**Resolution:** {cr.resolution}"
        )
        display(Markdown(md))

## 7. Run Validator

In [None]:
validator_output = await validate(interpreter_output, baseline_sheets)

print(f"Validated findings:    {len(validator_output.validated_findings)}")
print(f"Missing findings:      {len(validator_output.missing_findings)}")
print(f"Unresolved conflicts:  {len(validator_output.unresolved_conflicts)}")

## 8. Validation Inspection

In [None]:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
val_path = OUTPUT_DIR / "validation.json"
val_path.write_text(
    json.dumps(validator_output.model_dump(), ensure_ascii=False, indent=2),
    encoding="utf-8",
)
print(f"Saved: {val_path}")

if validator_output.missing_findings:
    display(Markdown("### Missing Findings"))
    for mf in validator_output.missing_findings:
        display(Markdown(f"- {mf}"))
else:
    display(Markdown("*No missing findings detected.*"))

if validator_output.unresolved_conflicts:
    display(Markdown("### Unresolved Conflicts"))
    for uc in validator_output.unresolved_conflicts:
        display(Markdown(f"- {uc}"))
else:
    display(Markdown("*No unresolved conflicts.*"))

## 9. Run Generator

In [None]:
generator_output = await generate(validator_output)
print(f"Summary length: {len(generator_output.summary)} chars")

In [None]:
expected_sections = list(dict.fromkeys(OUTPUT_SECTIONS))
summary_text = generator_output.summary.lower()
coverage_rows = []
for section in expected_sections:
    found = section.lower() in summary_text
    coverage_rows.append({"Section": section, "Found in Summary": found})

coverage_df = pd.DataFrame(coverage_rows)
found_count = coverage_df["Found in Summary"].sum()
total = len(coverage_df)
print(f"Section coverage: {found_count}/{total} ({found_count/total*100:.0f}%)\n")
display(coverage_df)

## 10. Final Summary

In [None]:
display(Markdown(generator_output.summary))

In [None]:
summary_path = OUTPUT_DIR / "summary.md"
summary_path.write_text(generator_output.summary, encoding="utf-8")
print(f"Saved: {summary_path}")