# Histopathology Pipeline Prototype (2024년 biopsy matches)이 노트북은 2024년 biopsy matching 데이터셋을 탐색하고 histopathology classifier 학습을 위한 준비 과정을 다룹니다. 여기서는 raw diagnostic reports를 machine-readable labels로 변환하고 `.svs` whole-slide images (WSI)를 위한 patch-extraction workflow를 개괄하는 데 초점을 맞춥니다.

## 목표1. Excel/CSV에서 export된 biopsy metadata를 점검하고 정제합니다.2. 이질적인 diagnostic 텍스트를 일관된 disease labels로 통합합니다.3. 이용 가능한 annotations로 구현 가능한 modelling target을 정의합니다.4. downstream modelling을 위해 stratified train/validation/test splits를 생성합니다.5. `.svs` slides에 대한 patch extraction workflow를 개괄하고, 누락된 파일을 확인하며 기본 tissue filtering heuristic을 서술합니다.> **참고:** 현재 레포지토리에는 metadata CSV만 포함되어 있습니다. 실제 WSI `.svs` 파일은 patch extractor 셀을 실행하기 전에 별도로 마운트해야 합니다.

In [None]:

from __future__ import annotations

import json
import re
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

DATA_PATH = Path('Data') / '조직검사 결과 매칭(2024)_utf8_pruned.csv'
assert DATA_PATH.exists(), f"Missing dataset: {DATA_PATH}"

raw_df = pd.read_csv(DATA_PATH)
print(f"Loaded {len(raw_df):,} biopsy records with {raw_df.shape[1]} columns")
raw_df.head()


### 결측값 개요

In [None]:

missing_summary = (
    raw_df.isna()
    .mean()
    .sort_values(ascending=False)
    .to_frame(name='missing_ratio')
)
missing_summary.head(10)


## Diagnosis 정규화`DIAGNOSIS` 컬럼에는 장문의 free-text 문자열이 포함되어 있습니다. 이를 machine learning target으로 사용하기 위해서는 다음과 같이 처리합니다:1. 첫 번째 쉼표 이전 부분만 유지하여 staging이나 margin 코멘트를 제거합니다.2. 괄호를 제거하고 공백을 정규화합니다.3. 소문자로 변환하고 자주 등장하는 동의어를 canonical form으로 매핑합니다.

In [None]:

NORMALISATION_RULES = [
    (r'^mast cell tumor', 'mast cell tumor'),
    (r'^cutaneous mast cell tumor', 'mast cell tumor'),
    (r'^subcutaneous mast cell tumor', 'mast cell tumor'),
    (r'^mammary gland adenoma', 'mammary adenoma'),
    (r'^mammary complex adenoma', 'mammary complex adenoma'),
    (r'^mammary benign mixed tumor', 'mammary benign mixed tumor'),
    (r'^mammary carcinoma', 'mammary carcinoma'),
    (r'^mammary duct carcinoma', 'mammary carcinoma'),
    (r'^mammary adenoma', 'mammary adenoma'),
    (r'^lipoma', 'lipoma'),
    (r'^subcutaneous lipoma', 'lipoma'),
    (r'^hepatic lipoma', 'lipoma'),
    (r'^sebaceous adenoma', 'sebaceous adenoma'),
    (r'^sebaceous epithelioma', 'sebaceous epithelioma'),
    (r'^soft tissue sarcoma', 'soft tissue sarcoma'),
    (r'^trichoblastoma', 'trichoblastoma'),
]


def normalise_diagnosis(value: str) -> str:
    if not isinstance(value, str) or not value.strip():
        return 'unknown'

    base = value.split(',')[0]
    base = re.sub(r'\([^)]*\)', '', base)
    base = re.sub(r'[^a-zA-Z0-9\s]', ' ', base)
    base = re.sub(r'\s+', ' ', base).strip().lower()

    for pattern, canonical in NORMALISATION_RULES:
        if re.match(pattern, base):
            return canonical

    return base


raw_df['disease_family'] = raw_df['DIAGNOSIS'].map(normalise_diagnosis)
print('Unique disease families:', raw_df['disease_family'].nunique())
raw_df['disease_family'].value_counts().head(20)


### 모델링 target 선택이 데이터셋에는 수천 개의 diagnostic 문구가 포함되어 있습니다. 5,000개가 넘는 라벨 전체를 대상으로 multi-class classifier를 학습하면 극도로 희소해집니다. 따라서 빈도가 낮은 클래스는 `other` 버킷으로 모으고, 가장 흔한 disease families에 집중합니다. 클래스 균형을 조정하려면 `MIN_CASES` 값을 수정하세요.

In [None]:

MIN_CASES = 250
value_counts = raw_df['disease_family'].value_counts()
major_labels = value_counts[value_counts >= MIN_CASES].index.tolist()
print(f"Keeping {len(major_labels)} frequent labels (≥{MIN_CASES} cases)")

raw_df['target_label'] = np.where(
    raw_df['disease_family'].isin(major_labels),
    raw_df['disease_family'],
    'other'
)

raw_df['target_label'].value_counts()


### Train/validation/test split 메타데이터통합된 target labels를 기준으로 stratified splits를 생성합니다. 이후 추출된 patch-level 데이터와 조인하여 사용할 수 있습니다.

In [None]:

META_COLUMNS = ['INSP_RQST_NO', 'FOLDER', 'FILE_NAME', 'target_label']
meta_df = raw_df[META_COLUMNS].drop_duplicates().reset_index(drop=True)

train_df, temp_df = train_test_split(
    meta_df,
    test_size=0.3,
    random_state=42,
    stratify=meta_df['target_label']
)
valid_df, test_df = train_test_split(
    temp_df,
    test_size=0.5,
    random_state=42,
    stratify=temp_df['target_label']
)

print('Train size:', len(train_df))
print('Valid size:', len(valid_df))
print('Test size :', len(test_df))

split_summary = {
    'train': train_df['target_label'].value_counts(normalize=True).to_dict(),
    'valid': valid_df['target_label'].value_counts(normalize=True).to_dict(),
    'test': test_df['target_label'].value_counts(normalize=True).to_dict(),
}
json.dumps(split_summary, indent=2)


## WSI 가용성 확인metadata는 요청별 `FOLDER`에 저장된 `.svs` 파일을 참조합니다. slides가 마운트된 디렉터리 경로를 `WSI_ROOT`에 맞게 수정하세요.

In [None]:

WSI_ROOT = Path('Data/WSI')  # TODO: update to actual location

if not WSI_ROOT.exists():
    print(f"WSI root missing at {WSI_ROOT.resolve()}. Patch extraction will be skipped.")
else:
    sample_rows = raw_df.head(3)
    for _, row in sample_rows.iterrows():
        candidate = WSI_ROOT / row['FOLDER'] / row['FILE_NAME']
        print(candidate, 'exists' if candidate.exists() else 'missing')


## Patch extraction 유틸리티고해상도 패치를 스트리밍하기 위해 [OpenSlide](https://openslide.org/)를 사용합니다. 다음 셀을 실행하기 전에 prerequisites를 설치하세요:```bashsudo apt-get install openslide-toolspip install openslide-python```OpenSlide을 사용할 수 없는 경우 아래 코드는 우회 동작하도록 작성되어 있습니다.

In [None]:

try:
    import openslide
    from PIL import Image
    HAS_OPENSLIDE = True
    print('OpenSlide version:', openslide.__library_version__)
except Exception as exc:  # noqa: BLE001
    HAS_OPENSLIDE = False
    print('OpenSlide not available:', exc)


In [None]:

@dataclass
class PatchSpec:
    slide_id: str
    x: int
    y: int
    level: int
    size: int
    label: str
    source_path: Path


def resolve_slide_path(row: pd.Series, root: Path) -> Optional[Path]:
    """Resolve the absolute slide path using folder/name hints."""
    candidates = [root / row['FOLDER'] / row['FILE_NAME'], root / row['FILE_NAME']]
    for cand in candidates:
        if cand.exists():
            return cand
    return None


def tissue_fraction(tile: Image.Image) -> float:
    arr = np.asarray(tile.convert('L'))
    norm = (arr - arr.min()) / (arr.ptp() + 1e-6)
    return float((norm < 0.85).mean())


def extract_patches_for_slide(
    slide_path: Path,
    label: str,
    level: int = 0,
    patch_size: int = 256,
    stride: Optional[int] = None,
    max_patches: int = 200,
    tissue_threshold: float = 0.2,
) -> List[PatchSpec]:
    if not HAS_OPENSLIDE:
        raise RuntimeError('OpenSlide support is required to extract patches.')

    slide = openslide.OpenSlide(str(slide_path))
    stride = stride or patch_size
    width, height = slide.level_dimensions[level]

    patches: List[PatchSpec] = []
    for y in range(0, height - patch_size + 1, stride):
        for x in range(0, width - patch_size + 1, stride):
            region = slide.read_region((x, y), level, (patch_size, patch_size))
            if tissue_fraction(region) < tissue_threshold:
                continue
            patches.append(PatchSpec(
                slide_id=slide_path.stem,
                x=x,
                y=y,
                level=level,
                size=patch_size,
                label=label,
                source_path=slide_path,
            ))
            if len(patches) >= max_patches:
                break
        if len(patches) >= max_patches:
            break
    slide.close()
    return patches


def build_patch_index(
    split_df: pd.DataFrame,
    root: Path,
    level: int = 0,
    patch_size: int = 256,
    stride: Optional[int] = None,
    max_patches_per_slide: int = 200,
    tissue_threshold: float = 0.2,
) -> pd.DataFrame:
    records = []
    for _, row in split_df.iterrows():
        slide_path = resolve_slide_path(row, root)
        if slide_path is None:
            continue
        try:
            patches = extract_patches_for_slide(
                slide_path=slide_path,
                label=row['target_label'],
                level=level,
                patch_size=patch_size,
                stride=stride,
                max_patches=max_patches_per_slide,
                tissue_threshold=tissue_threshold,
            )
        except Exception as exc:  # noqa: BLE001
            print(f"Skipping {row['FILE_NAME']}: {exc}")
            continue

        for patch in patches:
            records.append({
                'slide_id': patch.slide_id,
                'x': patch.x,
                'y': patch.y,
                'level': patch.level,
                'size': patch.size,
                'label': patch.label,
                'source_path': str(patch.source_path),
            })

    return pd.DataFrame.from_records(records)


### Patch extraction 데모다음 셀은 training split을 기반으로 작은 patch index를 구축하려고 시도합니다. slides가 없으면 필요한 prerequisites가 누락되었음을 보고합니다.

In [None]:

if not HAS_OPENSLIDE:
    print('OpenSlide missing — skipping patch extraction demo.')
elif not WSI_ROOT.exists():
    print('WSI directory not found — mount slides and re-run.')
else:
    demo_df = train_df.head(2)
    patch_index = build_patch_index(
        split_df=demo_df,
        root=WSI_ROOT,
        level=0,
        patch_size=256,
        stride=256,
        max_patches_per_slide=16,
        tissue_threshold=0.25,
    )
    print('Extracted', len(patch_index), 'patches')
    patch_index.head()


## 다음 단계* 추가 diagnostic variant가 발견되면 normalisation 규칙을 보강합니다.* downstream training scripts에서 사용할 수 있도록 split metadata(`train.csv`, `valid.csv`, `test.csv`)와 생성된 `patch_index.parquet`를 저장합니다.* 모델 입력 전에 patch-level filtering(예: blur detection, stain normalisation)을 구현합니다.* 반복적인 patch extraction을 방지하기 위해 slide-level cache를 연결합니다.