# 데이터 분석

## 목차(상세)

### 0. 문서 개요

0.1 프로젝트 목표와 성공 기준

0.2 범위(학습/추론/제출)와 제외 범위

0.3 변경 이력(실험/규칙 변경 기록)

---

### 1. 대회 이해와 설계 기준

1.1 문제 정의: 이미지당 최대 4개 객체 검출

1.2 평가 지표 해설: **mAP@[0.75:0.95]** 가 요구하는 “박스 정밀도”

1.3 데이터 특성 요약: PNG + COCO(JSON), 이미지/라벨 구조

1.4 클래스 구성 리스크: **Train에만 존재하는 클래스**가 있는 이유와 대응 전략

1.5 제출 규격 요약: CSV 컬럼/형식, row=객체 1개 규칙, image_id 규칙

---

### 2. 프로젝트 구조와 재현성 규칙

2.1 디렉터리 구조(데이터/실험/모델/로그/제출물)

2.2 환경 고정(패키지 버전, GPU/Seed, 실행 커맨드)

2.3 실험 관리 규칙(RUN_NAME, config 저장, 체크포인트 정책)

2.4 공통 로깅 항목(성능표, 하이퍼파라미터, 데이터 버전, 후처리 값)

---

### 3. 데이터 준비 파이프라인

3.1 원천 파일 구조: train_images / train_annotations / test_images

3.2 COCO JSON 핵심 필드 정리(images / annotations / categories)

3.3 **QC Gate 1: 라벨 존재/유효성 검사**

* bbox 키 존재 여부, 길이=4 여부
* 음수/0 너비·높이, 이미지 경계 밖 bbox
* area 분포 이상치 탐지
  3.4 **QC Gate 2: 데이터 무결성 검사**
* image_id ↔ file_name 매핑 일관성
* 중복/누락 이미지 체크
  3.5 데이터 EDA 리포트(필수 산출물)
* 클래스 빈도, 이미지당 객체 수(1~4)
* bbox 크기/종횡비/면적 분포(“작은 알약” 비중 확인)
  3.6 클래스 매핑 정책
* categories 정리, label map 고정
* train-only 클래스 처리 원칙(억제/가중치/필터링 등)
  3.7 스플릿 전략(안정성 중심)
* 기본: seed 고정 + 이미지당 객체수 stratify
* 고급: 멀티라벨 stratify 또는 K-fold(클래스 누락 방지)
  3.8 학습 포맷 변환 및 검증
* COCO → 학습 포맷 변환(예: YOLO 포맷)
* 변환 검증: 샘플 시각화로 bbox가 정확히 올라가는지 확인

---

### 4. 베이스라인 모델 설계

4.1 후보 모델군 및 선택 기준(속도/정확도/작은 객체)

4.2 입력 해상도 전략(해상도 스윕 계획 포함)

4.3 기본 학습 레시피(epochs, batch, optimizer, lr schedule)

4.4 증강 설계(작은 객체/겹침 상황 고려)

* geometric / color / blur / cutout
* mosaic/mixup 사용 여부와 리스크
  4.5 체크포인트/early stopping/EMA 등 안정화 옵션

---

### 5. 학습 실험 계획(점수에 직결되는 순서로)

5.1 실험 우선순위(가성비 트랙 → 고성능 트랙)

5.2 해상도 스윕: 640 → 768 → 960(예시)

5.3 증강 아블레이션(하나씩 추가/제거하며 영향 확인)

5.4 하이퍼파라미터 미세 조정

* lr, weight decay, warmup, augmentation strength
  5.5 train-only 클래스 리스크 완화 실험
* 클래스별 threshold/억제 규칙 설계
* “오답 많이 내는 클래스” 방어 전략
  5.6 실험 결과 기록 템플릿(표 형태로 고정)

---

### 6. 평가 프로토콜 & 박스 품질 진단(필수)

6.1 공통 평가 규칙(동일 val, 동일 seed, 동일 후처리 기본값)

6.2 **IoU 구간별 성능 분해**

* mAP@0.75 / 0.85 / 0.95 관찰 포인트
  6.3 **BBox 품질 리포트**
* bbox 크기 bucket별(작은/중간/큰) 성능
* 겹침/밀집 장면에서의 실패 유형
  6.4 에러 분석 체계
* FP/FN 분류(클래스 혼동, 위치 오차, 중복 검출, 누락)
* worst-k 샘플 자동 추출 + 시각화

---

### 7. 추론 & 후처리(Top-4 제약 대응 핵심 파트)

7.1 기본 추론 파이프라인(모델 로드, 이미지 전처리)

7.2 NMS 전략 비교(hard NMS / soft NMS / WBF 선택지)

7.3 **Top-4 제약 적용 로직**

* 이미지별 score 정렬 → 최대 4개만 남기기
* 중복/근접 박스 정리 규칙
  7.4 클래스별 threshold / score calibration
* “리스크 클래스” 억제 규칙(오답 방지)
  7.5 좌표 후처리
* 이미지 경계 clipping
* bbox_w/h 0 방지, 타입/반올림 규칙

---

### 8. 제출 파일 생성 & 검증(실수 방지 게이트)

8.1 제출 CSV 스펙(컬럼/자료형/의미)

8.2 annotation_id 생성 규칙(로우 수만큼 고유값 보장)

8.3 **Submission Validator(자동 점검)**

* 누락 컬럼/NaN/음수 좌표
* image_id 형식(파일명 숫자)
* bbox 범위/폭높이 유효성
  8.4 샘플 제출 시각 검증(랜덤 N장 렌더링)
  8.5 최종 제출 체크리스트

---

### 9. 최종 모델 선택 & 리더보드 운영 전략

9.1 선택 기준(성능 + 안정성 + 과적합 징후)

9.2 Public/Private 차이를 고려한 판단 규칙

9.3 (옵션) 앙상블/TTA 적용 기준과 비용 대비 효과

In [1]:
# [Code Cell] 2-1. 프로젝트 디렉터리/경로 고정 + 존재 여부 점검 + RUN 폴더 생성

from pathlib import Path
from datetime import datetime
import os
import json

# 1) Project root (현재 노트북 실행 위치를 project root로 가정)
ROOT = Path(".").resolve()

# 2) Input data paths (이미 존재한다고 한 폴더들)
INPUT = {
    "TRAIN_IMAGES": ROOT / "train_images",
    "TRAIN_ANN_DIR": ROOT / "train_annotations",
    "TEST_IMAGES": ROOT / "test_images",
}

# 3) Working dirs (이미 만들어둔 폴더들)
WORK = {
    "DATA": ROOT / "data",
    "RUNS": ROOT / "runs",
    "ARTIFACTS": ROOT / "artifacts",
}

# 4) 필수 폴더 존재 체크
missing = [k for k, p in {**INPUT, **WORK}.items() if not p.exists()]
if missing:
    raise FileNotFoundError(
        "필수 폴더가 없습니다:\n" +
        "\n".join([f"- {k}: {str(({**INPUT, **WORK})[k])}" for k in missing])
    )

# 5) RUN 이름 (원하면 아래 RUN_NAME만 바꿔도 전체 경로가 함께 바뀜)
RUN_NAME = os.environ.get("RUN_NAME") or f"exp_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
RUN_DIR = WORK["RUNS"] / RUN_NAME
ART_DIR = WORK["ARTIFACTS"] / RUN_NAME

# 6) RUN 하위 구조 (실험/로그/체크포인트/제출물/시각화 등)
DIRS = {
    # run space
    "RUN_DIR": RUN_DIR,
    "CKPT": RUN_DIR / "checkpoints",
    "LOGS": RUN_DIR / "logs",
    "CONFIG": RUN_DIR / "config",
    # artifacts space
    "ART_DIR": ART_DIR,
    "SUBMISSIONS": ART_DIR / "submissions",
    "PLOTS": ART_DIR / "plots",
    "REPORTS": ART_DIR / "reports",
    # cached/processed data (optional)
    "CACHE": WORK["DATA"] / "cache" / RUN_NAME,
}

for p in DIRS.values():
    p.mkdir(parents=True, exist_ok=True)

# 7) 실행 설정 스냅샷 저장 (재현성)
meta = {
    "run_name": RUN_NAME,
    "root": str(ROOT),
    "input": {k: str(v) for k, v in INPUT.items()},
    "work": {k: str(v) for k, v in WORK.items()},
    "dirs": {k: str(v) for k, v in DIRS.items()},
}

meta_path = DIRS["CONFIG"] / "paths_meta.json"
with open(meta_path, "w", encoding="utf-8") as f:
    json.dump(meta, f, indent=2, ensure_ascii=False)

# 8) 간단 출력
print(f"[OK] ROOT      : {ROOT}")
print(f"[OK] RUN_NAME  : {RUN_NAME}")
print(f"[OK] RUN_DIR   : {RUN_DIR}")
print(f"[OK] ART_DIR   : {ART_DIR}")
print(f"[OK] saved meta: {meta_path}")

# 9) 입력 폴더 파일 개수 확인 (대략적인 sanity check)
def count_files(folder: Path, exts=None):
    if exts is None:
        return sum(1 for _ in folder.rglob("*") if _.is_file())
    exts = set(e.lower() for e in exts)
    return sum(1 for _ in folder.rglob("*") if _.is_file() and _.suffix.lower() in exts)

print("\n[INPUT COUNTS]")
print(f"- train_images      : {count_files(INPUT['TRAIN_IMAGES'], exts=['.png'])} png")
print(f"- train_annotations : {count_files(INPUT['TRAIN_ANN_DIR'], exts=['.json'])} json")
print(f"- test_images       : {count_files(INPUT['TEST_IMAGES'], exts=['.png'])} png")

[OK] ROOT      : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2
[OK] RUN_NAME  : exp_20260202_230604
[OK] RUN_DIR   : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604
[OK] ART_DIR   : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604
[OK] saved meta: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\config\paths_meta.json

[INPUT COUNTS]
- train_images      : 232 png
- train_annotations : 763 json
- test_images       : 842 png


In [2]:
# [Code Cell] 2-2. 재현성 설정(Seed 고정) + 실행 환경/버전 스냅샷 저장

import os
import sys
import platform
import json
import random
from datetime import datetime

# 1) Seed / Determinism 설정
SEED = int(os.environ.get("SEED", "42"))
DETERMINISTIC = True  # 필요 시 False로 바꿔도 됨

os.environ["PYTHONHASHSEED"] = str(SEED)

# (CUDA/torch에서 determinism 강제 시 일부 연산에서 성능 저하/에러 가능)
# cublas determinism(일부 환경에서만 유효)
os.environ.setdefault("CUBLAS_WORKSPACE_CONFIG", ":4096:8")

random.seed(SEED)

# numpy seed
try:
    import numpy as np
    np.random.seed(SEED)
except Exception as e:
    np = None
    print(f"[WARN] numpy seed skip: {e}")

# torch seed + cudnn
torch_info = {}
try:
    import torch

    torch.manual_seed(SEED)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(SEED)

    if DETERMINISTIC:
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        # strict determinism (연산에 따라 에러가 날 수 있어 선택)
        # torch.use_deterministic_algorithms(True)
    else:
        torch.backends.cudnn.deterministic = False
        torch.backends.cudnn.benchmark = True

    torch_info = {
        "torch_version": torch.__version__,
        "cuda_available": torch.cuda.is_available(),
        "cuda_version": getattr(torch.version, "cuda", None),
        "cudnn_version": torch.backends.cudnn.version() if torch.backends.cudnn.is_available() else None,
        "device_count": torch.cuda.device_count() if torch.cuda.is_available() else 0,
        "device_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() and torch.cuda.device_count() > 0 else None,
    }
except Exception as e:
    torch = None
    print(f"[WARN] torch seed skip: {e}")

# 2) 주요 라이브러리 버전 수집(없으면 None)
def safe_version(pkg_name: str):
    try:
        from importlib.metadata import version
        return version(pkg_name)
    except Exception:
        return None

pkgs = [
    "numpy",
    "pandas",
    "opencv-python",
    "albumentations",
    "pycocotools",
    "torch",
    "torchvision",
    "ultralytics",
    "timm",
    "matplotlib",
    "scikit-learn",
]

pkg_versions = {p: safe_version(p) for p in pkgs}

# 3) 환경 메타 저장
env_meta = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "seed": SEED,
    "deterministic": DETERMINISTIC,
    "python": {
        "version": sys.version,
        "executable": sys.executable,
    },
    "platform": {
        "system": platform.system(),
        "release": platform.release(),
        "version": platform.version(),
        "machine": platform.machine(),
        "processor": platform.processor(),
    },
    "packages": pkg_versions,
    "torch": torch_info,
}

env_meta_path = DIRS["CONFIG"] / "env_meta.json"
with open(env_meta_path, "w", encoding="utf-8") as f:
    json.dump(env_meta, f, indent=2, ensure_ascii=False)

print(f"[OK] SEED={SEED}, DETERMINISTIC={DETERMINISTIC}")
print(f"[OK] saved env meta: {env_meta_path}")

# 4) 핵심만 출력
print("\n[VERSIONS]")
for k in ["numpy", "pandas", "opencv-python", "albumentations", "pycocotools", "torch", "torchvision", "ultralytics"]:
    if k in pkg_versions:
        print(f"- {k}: {pkg_versions[k]}")
if torch_info:
    print("\n[TORCH]")
    for k, v in torch_info.items():
        print(f"- {k}: {v}")


[OK] SEED=42, DETERMINISTIC=True
[OK] saved env meta: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\config\env_meta.json

[VERSIONS]
- numpy: 2.4.2
- pandas: 3.0.0
- opencv-python: 4.13.0.90
- albumentations: 2.0.8
- pycocotools: None
- torch: 2.5.1+cu121
- torchvision: 0.20.1+cu121
- ultralytics: 8.4.9

[TORCH]
- torch_version: 2.5.1+cu121
- cuda_available: True
- cuda_version: 12.1
- cudnn_version: 90100
- device_count: 1
- device_name: NVIDIA GeForce RTX 3080


In [3]:
# [Code Cell] 2-3. 실험 관리 규칙(RUN config/manifest) + 로깅/메트릭 저장 유틸

from pathlib import Path
import json
import csv
import subprocess
from datetime import datetime

# -----------------------------
# 0) Small utils
# -----------------------------
def _json_dump(path: Path, obj: dict):
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)

def _json_load(path: Path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def _append_jsonl(path: Path, obj: dict):
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def _run_cmd(cmd):
    try:
        out = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8", errors="ignore").strip()
        return out
    except Exception:
        return None

# -----------------------------
# 1) Git snapshot (optional)
# -----------------------------
git_meta = {
    "git_head": _run_cmd(["git", "rev-parse", "HEAD"]),
    "git_branch": _run_cmd(["git", "rev-parse", "--abbrev-ref", "HEAD"]),
    "git_dirty": _run_cmd(["git", "status", "--porcelain"]),
}
git_meta["git_dirty"] = bool(git_meta["git_dirty"]) if git_meta["git_dirty"] is not None else None

# -----------------------------
# 2) Base config (project-level)
#    - 이후 셀에서 CFG만 업데이트하며 실험을 반복하기 위한 "단일 소스"
# -----------------------------
CFG_PATH = DIRS["CONFIG"] / "config.json"

DEFAULT_CFG = {
    "project": {
        "name": "ai07_pill_od",
        "run_name": RUN_NAME,
        "created_at": datetime.now().isoformat(timespec="seconds"),
    },
    "paths": {
        "root": str(ROOT),
        "train_images": str(INPUT["TRAIN_IMAGES"]),
        "train_ann_dir": str(INPUT["TRAIN_ANN_DIR"]),
        "test_images": str(INPUT["TEST_IMAGES"]),
        "run_dir": str(DIRS["RUN_DIR"]),
        "ckpt_dir": str(DIRS["CKPT"]),
        "logs_dir": str(DIRS["LOGS"]),
        "art_dir": str(DIRS["ART_DIR"]),
        "submissions_dir": str(DIRS["SUBMISSIONS"]),
        "reports_dir": str(DIRS["REPORTS"]),
        "plots_dir": str(DIRS["PLOTS"]),
        "cache_dir": str(DIRS["CACHE"]),
    },
    "reproducibility": {
        "seed": SEED,
        "deterministic": True,
    },
    "data": {
        "format": "coco_json_multi",
        "max_objects_per_image": 4,
        "num_classes": None,          # 추후 categories에서 자동 추출/확정
        "class_whitelist": None,      # test에 존재하는 40개 클래스 id 리스트(추후 설정)
    },
    "split": {
        "strategy": "stratify_by_num_objects",
        "seed": SEED,
        "ratios": {"train": 0.8, "valid": 0.2},
        "kfold": {"enabled": False, "n_splits": 5, "fold_idx": 0},
    },
    "train": {
        "framework": "ultralytics_yolo",   # 또는 mmrotate/mmdet 등으로 변경 가능
        "model": {
            "name": "yolov8s",
            "imgsz": 768,
            "pretrained": True,
        },
        "hyperparams": {
            "epochs": 80,
            "batch": 8,
            "lr0": None,
            "weight_decay": None,
            "workers": 4,
        },
        "augment": {
            "enabled": True,
            "mosaic": True,
            "mixup": False,
            "hsv": True,
            "flip": True,
        },
        "checkpoint_policy": {
            "save_best_on": "val_map_75_95",  # 우리가 보는 기준(툴이 제공하는 metric명에 맞춰 조정)
            "save_last": True,
            "keep_top_k": 3,
        },
    },
    "infer": {
        "conf_thr": 0.001,      # 후보는 넓게 뽑고 후처리에서 Top-4로 정리
        "nms_iou_thr": 0.5,
        "max_det_per_image": 4, # 대회 제약 고정
        "tta": {"enabled": False},
    },
    "postprocess": {
        "strategy": "topk_by_score",
        "topk": 4,
        "classwise_threshold": None,  # {class_id: thr}
        "clip_boxes": True,
    },
    "submission": {
        "columns": ["annotation_id", "image_id", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h", "score"],
        "image_id_rule": "file_stem_int",
        "annotation_id_rule": "unique_row_id",
        "bbox_format": "xywh_abs",
    },
    "notes": "",
}

if CFG_PATH.exists():
    CFG = _json_load(CFG_PATH)
else:
    CFG = DEFAULT_CFG
    _json_dump(CFG_PATH, CFG)

# -----------------------------
# 3) Run manifest (run-level snapshot)
# -----------------------------
MANIFEST_PATH = DIRS["CONFIG"] / "run_manifest.json"
manifest = {
    "run_name": RUN_NAME,
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "paths_meta": str((DIRS["CONFIG"] / "paths_meta.json").resolve()),
    "env_meta": str((DIRS["CONFIG"] / "env_meta.json").resolve()),
    "config": str(CFG_PATH.resolve()),
    "git": git_meta,
}
_json_dump(MANIFEST_PATH, manifest)

# -----------------------------
# 4) Logging helpers (jsonl)
# -----------------------------
METRICS_JSONL = DIRS["LOGS"] / "metrics.jsonl"
EVENTS_JSONL = DIRS["LOGS"] / "events.jsonl"

def log_event(name: str, payload: dict | None = None):
    _append_jsonl(EVENTS_JSONL, {
        "ts": datetime.now().isoformat(timespec="seconds"),
        "event": name,
        "payload": payload or {},
        "run_name": RUN_NAME,
    })

def log_metrics(step: str, metrics: dict):
    _append_jsonl(METRICS_JSONL, {
        "ts": datetime.now().isoformat(timespec="seconds"),
        "step": step,
        "metrics": metrics,
        "run_name": RUN_NAME,
    })

def save_cfg():
    _json_dump(CFG_PATH, CFG)
    log_event("config_saved", {"path": str(CFG_PATH)})

# -----------------------------
# 5) Simple run registry (runs/_registry.csv)
# -----------------------------
REG_PATH = WORK["RUNS"] / "_registry.csv"
is_new = not REG_PATH.exists()

with open(REG_PATH, "a", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    if is_new:
        w.writerow(["run_name", "created_at", "run_dir", "config_path", "git_head"])
    w.writerow([RUN_NAME, manifest["created_at"], str(DIRS["RUN_DIR"]), str(CFG_PATH), git_meta.get("git_head")])

# -----------------------------
# 6) Print summary
# -----------------------------
print(f"[OK] CFG_PATH      : {CFG_PATH}")
print(f"[OK] MANIFEST_PATH : {MANIFEST_PATH}")
print(f"[OK] METRICS_JSONL  : {METRICS_JSONL}")
print(f"[OK] EVENTS_JSONL   : {EVENTS_JSONL}")
print(f"[OK] REGISTRY       : {REG_PATH}")
log_event("run_initialized", {"manifest": str(MANIFEST_PATH)})

[OK] CFG_PATH      : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\config\config.json
[OK] MANIFEST_PATH : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\config\run_manifest.json
[OK] METRICS_JSONL  : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\logs\metrics.jsonl
[OK] EVENTS_JSONL   : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\logs\events.jsonl
[OK] REGISTRY       : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\_registry.csv


In [4]:
# [Code Cell] 2-4. 공통 로깅 스키마 고정 + 결과표(results.csv) 기록 유틸 + 요약 리포트 생성

from pathlib import Path
from datetime import datetime
import json
import csv

# 필수 전역 변수 체크(이전 셀에서 생성된 것들)
required_globals = ["DIRS", "WORK", "RUN_NAME", "CFG"]
missing_g = [g for g in required_globals if g not in globals()]
if missing_g:
    raise RuntimeError(f"이전 셀(2-1~2-3)을 먼저 실행해야 합니다. missing: {missing_g}")

RESULTS_CSV = Path(DIRS["REPORTS"]) / "results.csv"
RESULTS_JSONL = Path(DIRS["REPORTS"]) / "results.jsonl"
LATEST_MD = Path(DIRS["REPORTS"]) / "latest_summary.md"

def _flatten_dict(d, parent_key="", sep="."):
    items = {}
    if isinstance(d, dict):
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else str(k)
            if isinstance(v, dict):
                items.update(_flatten_dict(v, new_key, sep=sep))
            else:
                items[new_key] = v
    return items

def _safe_scalar(x):
    # CSV/JSON에 넣기 안전한 형태로 변환
    if x is None:
        return None
    if isinstance(x, (int, float, str, bool)):
        return x
    if isinstance(x, (list, tuple)):
        return json.dumps(x, ensure_ascii=False)
    if isinstance(x, dict):
        return json.dumps(x, ensure_ascii=False)
    return str(x)

def _pick_cfg_fields(cfg: dict):
    flat = _flatten_dict(cfg)

    # 자주 비교할 핵심 키들(필요하면 여기만 늘리면 됨)
    keys = [
        "reproducibility.seed",
        "reproducibility.deterministic",

        "split.strategy",
        "split.ratios.train",
        "split.ratios.valid",
        "split.kfold.enabled",
        "split.kfold.n_splits",
        "split.kfold.fold_idx",

        "train.framework",
        "train.model.name",
        "train.model.imgsz",
        "train.model.pretrained",
        "train.hyperparams.epochs",
        "train.hyperparams.batch",
        "train.hyperparams.lr0",
        "train.hyperparams.weight_decay",
        "train.hyperparams.workers",

        "infer.conf_thr",
        "infer.nms_iou_thr",
        "infer.max_det_per_image",

        "postprocess.strategy",
        "postprocess.topk",
        "postprocess.clip_boxes",
    ]

    picked = {}
    for k in keys:
        picked[k] = _safe_scalar(flat.get(k, None))

    # classwise threshold는 크면 요약만 (전체 dict는 jsonl에 저장)
    cwt = cfg.get("postprocess", {}).get("classwise_threshold", None)
    if isinstance(cwt, dict):
        picked["postprocess.classwise_threshold.n"] = len(cwt)
    else:
        picked["postprocess.classwise_threshold.n"] = 0 if cwt is None else 1

    return picked

def init_results_table():
    if RESULTS_CSV.exists():
        return

    base_cols = [
        "ts",
        "run_name",
        "result_name",   # 예: "baseline_v1", "imgsz960_augA"
        "stage",         # 예: "val", "oof", "public_lb", "private_lb"
        "notes",
    ]

    cfg_cols = list(_pick_cfg_fields(CFG).keys())

    metric_cols = [
        # 공통으로 쓰기 좋은 메트릭 키들(실제로 없으면 빈 값)
        "mAP_75_95",
        "mAP_50",
        "mAP_75",
        "mean_IoU_TP",
        "precision",
        "recall",
    ]

    extra_cols = [
        "cfg_path",
        "run_dir",
        "ckpt_dir",
        "submission_path",
    ]

    header = base_cols + cfg_cols + metric_cols + extra_cols

    RESULTS_CSV.parent.mkdir(parents=True, exist_ok=True)
    with open(RESULTS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(header)

init_results_table()

def record_result(
    result_name: str,
    stage: str,
    metrics: dict | None = None,
    notes: str = "",
    submission_path: str | Path | None = None,
):
    """
    - result_name: 실험 별칭(짧게)
    - stage: "val" / "oof" / "public_lb" / "private_lb" 등
    - metrics: dict (키는 자유, results.csv에는 대표 키만 뽑아 기록)
    - notes: 한 줄 메모
    - submission_path: 제출 csv 경로(있으면 기록)
    """
    ts = datetime.now().isoformat(timespec="seconds")
    metrics = metrics or {}
    metrics = {str(k): _safe_scalar(v) for k, v in metrics.items()}

    cfg_fields = _pick_cfg_fields(CFG)

    # 대표 metric만 테이블에 박고, 전체는 jsonl로 별도 기록
    def mget(key, default=None):
        return metrics.get(key, default)

    row = {
        "ts": ts,
        "run_name": RUN_NAME,
        "result_name": result_name,
        "stage": stage,
        "notes": notes,
        **cfg_fields,
        "mAP_75_95": mget("mAP_75_95", mget("mAP@[0.75:0.95]", None)),
        "mAP_50": mget("mAP_50", mget("mAP@0.5", None)),
        "mAP_75": mget("mAP_75", mget("mAP@0.75", None)),
        "mean_IoU_TP": mget("mean_IoU_TP", None),
        "precision": mget("precision", None),
        "recall": mget("recall", None),
        "cfg_path": str((Path(DIRS["CONFIG"]) / "config.json").resolve()),
        "run_dir": str(Path(DIRS["RUN_DIR"]).resolve()),
        "ckpt_dir": str(Path(DIRS["CKPT"]).resolve()),
        "submission_path": str(Path(submission_path).resolve()) if submission_path else "",
    }

    # results.csv에 append
    with open(RESULTS_CSV, "r", encoding="utf-8") as f:
        header = next(csv.reader(f))

    with open(RESULTS_CSV, "a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=header)
        w.writerow({k: _safe_scalar(row.get(k, "")) for k in header})

    # 전체 기록은 jsonl에 저장(메트릭/threshold dict 등 보존)
    full = {
        "ts": ts,
        "run_name": RUN_NAME,
        "result_name": result_name,
        "stage": stage,
        "notes": notes,
        "cfg": CFG,
        "metrics": metrics,
        "paths": {
            "cfg_path": row["cfg_path"],
            "run_dir": row["run_dir"],
            "ckpt_dir": row["ckpt_dir"],
            "submission_path": row["submission_path"],
        },
    }
    RESULTS_JSONL.parent.mkdir(parents=True, exist_ok=True)
    with open(RESULTS_JSONL, "a", encoding="utf-8") as f:
        f.write(json.dumps(full, ensure_ascii=False) + "\n")

    # 최신 요약(md)
    md = []
    md.append(f"# Latest Summary — {RUN_NAME}")
    md.append(f"- ts: {ts}")
    md.append(f"- result_name: {result_name}")
    md.append(f"- stage: {stage}")
    if notes:
        md.append(f"- notes: {notes}")

    md.append("\n## Key Config")
    for k, v in cfg_fields.items():
        md.append(f"- {k}: {v}")

    md.append("\n## Metrics (raw)")
    for k, v in metrics.items():
        md.append(f"- {k}: {v}")

    if submission_path:
        md.append(f"\n## Submission\n- {row['submission_path']}")

    LATEST_MD.write_text("\n".join(md), encoding="utf-8")

    # events/logs에도 남기고 싶으면(2-3에서 정의된 함수가 있다면)
    if "log_event" in globals():
        log_event("result_recorded", {"result_name": result_name, "stage": stage, "results_csv": str(RESULTS_CSV)})

    print(f"[OK] recorded -> {RESULTS_CSV.name} | {result_name} ({stage})")
    print(f"[OK] jsonl    -> {RESULTS_JSONL.name}")
    print(f"[OK] summary  -> {LATEST_MD.name}")

# 경로 출력
print(f"[OK] RESULTS_CSV  : {RESULTS_CSV}")
print(f"[OK] RESULTS_JSONL: {RESULTS_JSONL}")
print(f"[OK] LATEST_MD    : {LATEST_MD}")

# 사용 예시(실제 metric 생기면 호출)
# record_result("baseline_v1", "val", metrics={"mAP_75_95": 0.1234, "mAP_50": 0.4567}, notes="imgsz=768")

[OK] RESULTS_CSV  : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\results.csv
[OK] RESULTS_JSONL: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\results.jsonl
[OK] LATEST_MD    : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\latest_summary.md


In [5]:
# [Code Cell] 3-1. 원천 데이터 구조 점검(폴더/파일 개수/샘플/파일명 규칙) + 인벤토리 저장

from pathlib import Path
import json
from collections import Counter

# 이전 셀에서 만든 INPUT / DIRS가 있으면 사용, 없으면 현재 폴더 기준으로 fallback
ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
REPORT_DIR = Path(globals().get("DIRS", {}).get("REPORTS", ROOT_ / "artifacts" / "reports"))

def count_files(folder: Path, exts=None):
    if not folder.exists():
        return 0
    if exts is None:
        return sum(1 for p in folder.rglob("*") if p.is_file())
    exts = {e.lower() for e in exts}
    return sum(1 for p in folder.rglob("*") if p.is_file() and p.suffix.lower() in exts)

def sample_files(folder: Path, exts=None, k=5):
    if not folder.exists():
        return []
    if exts is None:
        files = [p for p in folder.rglob("*") if p.is_file()]
    else:
        exts = {e.lower() for e in exts}
        files = [p for p in folder.rglob("*") if p.is_file() and p.suffix.lower() in exts]
    files = sorted(files)
    return [str(p.relative_to(ROOT_)) for p in files[:k]]

def parse_int_stems(folder: Path, exts=(".png",)):
    """파일 stem을 int로 파싱 가능 여부를 확인(제출 image_id 규칙 대비)"""
    stems = []
    bad = []
    for p in sorted(folder.glob("*")):
        if p.is_file() and p.suffix.lower() in exts:
            s = p.stem
            try:
                stems.append(int(s))
            except Exception:
                bad.append(s)
    return stems, bad

# 1) 기본 존재 여부 확인
for k, p in INPUT_.items():
    if not Path(p).exists():
        raise FileNotFoundError(f"필수 경로가 없습니다: {k} -> {p}")

train_img_dir = Path(INPUT_["TRAIN_IMAGES"])
test_img_dir = Path(INPUT_["TEST_IMAGES"])
train_ann_dir = Path(INPUT_["TRAIN_ANN_DIR"])

# 2) 파일 개수 요약
summary = {
    "paths": {k: str(Path(v).resolve()) for k, v in INPUT_.items()},
    "counts": {
        "train_images_png": count_files(train_img_dir, exts=[".png"]),
        "test_images_png": count_files(test_img_dir, exts=[".png"]),
        "train_annotations_json": count_files(train_ann_dir, exts=[".json"]),
        "train_annotations_dirs": sum(1 for p in train_ann_dir.iterdir() if p.is_dir()),
    },
    "samples": {
        "train_images": sample_files(train_img_dir, exts=[".png"], k=5),
        "test_images": sample_files(test_img_dir, exts=[".png"], k=5),
        "train_annotations": sample_files(train_ann_dir, exts=[".json"], k=5),
    },
}

# 3) train_annotations 하위 폴더 분포(폴더별 json 개수)
subdir_json_counts = {}
for d in sorted([p for p in train_ann_dir.iterdir() if p.is_dir()]):
    subdir_json_counts[d.name] = count_files(d, exts=[".json"])

if subdir_json_counts:
    vals = list(subdir_json_counts.values())
    summary["train_annotations_subdir_stats"] = {
        "n_subdirs": len(vals),
        "min_json": min(vals),
        "max_json": max(vals),
        "mean_json": sum(vals) / len(vals),
        "top5_subdirs_by_json": sorted(subdir_json_counts.items(), key=lambda x: x[1], reverse=True)[:5],
    }
else:
    summary["train_annotations_subdir_stats"] = {"n_subdirs": 0}

# 4) 이미지 파일명(stem) 숫자 파싱 가능 여부(제출 image_id 규칙 대비)
train_stems, train_bad = parse_int_stems(train_img_dir, exts=(".png",))
test_stems, test_bad = parse_int_stems(test_img_dir, exts=(".png",))

def stem_stats(stems, bad):
    dup = [s for s, c in Counter(stems).items() if c > 1]
    return {
        "n_files": len(stems) + len(bad),
        "n_parseable_int_stems": len(stems),
        "n_unparseable_stems": len(bad),
        "unparseable_examples": bad[:10],
        "n_duplicate_int_stems": len(dup),
        "duplicate_examples": dup[:10],
    }

summary["image_id_stem_check"] = {
    "train_images": stem_stats(train_stems, train_bad),
    "test_images": stem_stats(test_stems, test_bad),
}

# 5) 출력
print("[RAW DATA SUMMARY]")
print(f"- train_images      : {summary['counts']['train_images_png']} png")
print(f"- test_images       : {summary['counts']['test_images_png']} png")
print(f"- train_annotations : {summary['counts']['train_annotations_json']} json "
      f"(subdirs={summary['counts']['train_annotations_dirs']})")

print("\n[SAMPLES]")
print("- train_images:", *summary["samples"]["train_images"], sep="\n  ")
print("- test_images:", *summary["samples"]["test_images"], sep="\n  ")
print("- train_annotations:", *summary["samples"]["train_annotations"], sep="\n  ")

print("\n[IMAGE_ID STEM CHECK]")
for split in ["train_images", "test_images"]:
    st = summary["image_id_stem_check"][split]
    print(f"- {split}: parseable={st['n_parseable_int_stems']}/{st['n_files']}, "
          f"bad={st['n_unparseable_stems']}, dup={st['n_duplicate_int_stems']}")
    if st["n_unparseable_stems"] > 0:
        print(f"  - bad examples: {st['unparseable_examples'][:5]}")

# 6) 인벤토리 저장
REPORT_DIR.mkdir(parents=True, exist_ok=True)
out_path = REPORT_DIR / "raw_data_inventory.json"
with open(out_path, "w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)

print(f"\n[OK] saved inventory -> {out_path}")

[RAW DATA SUMMARY]
- train_images      : 232 png
- test_images       : 842 png
- train_annotations : 763 json (subdirs=114)

[SAMPLES]
- train_images:
  train_images\K-001900-016548-019607-029451_0_2_0_2_70_000_200.png
  train_images\K-001900-016548-019607-029451_0_2_0_2_75_000_200.png
  train_images\K-001900-016548-019607-029451_0_2_0_2_90_000_200.png
  train_images\K-001900-016548-019607-033009_0_2_0_2_70_000_200.png
  train_images\K-001900-016548-019607-033009_0_2_0_2_75_000_200.png
- test_images:
  test_images\1.png
  test_images\10.png
  test_images\100.png
  test_images\1003.png
  test_images\1004.png
- train_annotations:
  train_annotations\K-001900-016548-019607-029451_json\K-001900\K-001900-016548-019607-029451_0_2_0_2_70_000_200.json
  train_annotations\K-001900-016548-019607-029451_json\K-001900\K-001900-016548-019607-029451_0_2_0_2_75_000_200.json
  train_annotations\K-001900-016548-019607-029451_json\K-001900\K-001900-016548-019607-029451_0_2_0_2_90_000_200.json
  train_an

In [6]:
# [Code Cell] 3-2. COCO JSON 구조(Top-level / images / annotations / categories) 스키마 스캔 + 요약 리포트 저장

from pathlib import Path
import json
from collections import Counter, defaultdict
from datetime import datetime
import statistics as stats

# --- paths (이전 셀 변수 우선 사용) ---
ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
REPORT_DIR = Path(globals().get("DIRS", {}).get("REPORTS", ROOT_ / "artifacts" / "reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)

ann_root = Path(INPUT_["TRAIN_ANN_DIR"])
json_files = sorted(ann_root.rglob("*.json"))
if not json_files:
    raise FileNotFoundError(f"train_annotations 아래에서 json을 찾지 못했습니다: {ann_root}")

# --- scan controls ---
MAX_SCAN = int(os.environ.get("COCO_SCHEMA_SCAN_N", "50"))  # 많으면 느려질 수 있어 기본 50
scan_files = json_files[:min(MAX_SCAN, len(json_files))]

def _type_name(x):
    return type(x).__name__

def _safe_get_first(lst):
    return lst[0] if isinstance(lst, list) and len(lst) > 0 else None

def _summarize_counts(vals):
    if not vals:
        return {"n": 0}
    return {
        "n": len(vals),
        "min": min(vals),
        "max": max(vals),
        "mean": sum(vals) / len(vals),
        "median": stats.median(vals),
        "top5": Counter(vals).most_common(5),
    }

# --- collectors ---
top_keys = Counter()
images_keys = Counter()
ann_keys = Counter()
cat_keys = Counter()

images_count_list = []
ann_count_list = []
cat_count_list = []

bbox_len = Counter()
bbox_bad_examples = []
bbox_examples = []

image_extra_keys_examples = defaultdict(int)  # extra key 빈도(이미지 객체에서)
cat_id_name = {}  # 마지막으로 본 name으로 덮어쓰기(같으면 문제 없음)
cat_id_name_examples = []

warnings = []

# --- scanning ---
for fp in scan_files:
    try:
        with open(fp, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, dict):
            warnings.append({"file": str(fp), "warn": f"top-level is not dict: {type(data)}"})
            continue

        for k in data.keys():
            top_keys[k] += 1

        images = data.get("images", [])
        anns = data.get("annotations", [])
        cats = data.get("categories", [])

        if not isinstance(images, list):
            warnings.append({"file": str(fp), "warn": f"'images' is not list: {type(images)}"})
            images = []
        if not isinstance(anns, list):
            warnings.append({"file": str(fp), "warn": f"'annotations' is not list: {type(anns)}"})
            anns = []
        if not isinstance(cats, list):
            warnings.append({"file": str(fp), "warn": f"'categories' is not list: {type(cats)}"})
            cats = []

        images_count_list.append(len(images))
        ann_count_list.append(len(anns))
        cat_count_list.append(len(cats))

        # images keys
        img0 = _safe_get_first(images)
        if isinstance(img0, dict):
            for k in img0.keys():
                images_keys[k] += 1
                # extra meta 키 분포를 크게 보고 싶으면(기본적으로 다 포함)
                image_extra_keys_examples[k] += 1

        # annotations keys + bbox sanity
        ann0 = _safe_get_first(anns)
        if isinstance(ann0, dict):
            for k in ann0.keys():
                ann_keys[k] += 1

        # bbox checks: 샘플 몇 개만
        for a in anns[:20]:
            if not isinstance(a, dict):
                continue
            b = a.get("bbox", None)
            if b is None:
                bbox_len["None"] += 1
                continue
            if not isinstance(b, list):
                bbox_len["not_list"] += 1
                bbox_bad_examples.append({"file": str(fp), "bbox": str(b)[:200]})
                continue
            bbox_len[len(b)] += 1
            if len(b) == 4 and len(bbox_examples) < 5:
                bbox_examples.append({"file": str(fp), "bbox": b})
            if len(b) != 4 and len(bbox_bad_examples) < 10:
                bbox_bad_examples.append({"file": str(fp), "bbox": b})

        # categories keys + id/name mapping
        cat0 = _safe_get_first(cats)
        if isinstance(cat0, dict):
            for k in cat0.keys():
                cat_keys[k] += 1

        # id->name 예시 10개까지만
        for c in cats[:20]:
            if not isinstance(c, dict):
                continue
            cid = c.get("id", None)
            name = c.get("name", None)
            if cid is not None and name is not None:
                cat_id_name[cid] = name
                if len(cat_id_name_examples) < 10:
                    cat_id_name_examples.append({"id": cid, "name": name})

    except Exception as e:
        warnings.append({"file": str(fp), "warn": f"json load failed: {repr(e)}"})

# --- build summary ---
summary = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "ann_root": str(ann_root.resolve()),
    "n_json_files_total": len(json_files),
    "n_json_files_scanned": len(scan_files),
    "scan_limit": MAX_SCAN,
    "top_level_keys_presence": top_keys.most_common(),
    "counts": {
        "images_per_json": _summarize_counts(images_count_list),
        "annotations_per_json": _summarize_counts(ann_count_list),
        "categories_per_json": _summarize_counts(cat_count_list),
    },
    "images": {
        "common_keys_top30": images_keys.most_common(30),
    },
    "annotations": {
        "common_keys_top30": ann_keys.most_common(30),
        "bbox_len_distribution": dict(bbox_len),
        "bbox_examples": bbox_examples,
        "bbox_bad_examples": bbox_bad_examples[:10],
    },
    "categories": {
        "common_keys_top30": cat_keys.most_common(30),
        "id_name_examples": cat_id_name_examples,
        "unique_ids_seen_in_scanned": len(cat_id_name),
    },
    "warnings": warnings,
}

out_json = REPORT_DIR / "coco_schema_summary.json"
with open(out_json, "w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)

# --- print key points ---
print("[COCO SCHEMA SUMMARY]")
print(f"- ann_root            : {ann_root}")
print(f"- json total / scanned: {len(json_files)} / {len(scan_files)} (limit={MAX_SCAN})")

c_images = summary["counts"]["images_per_json"]
c_anns = summary["counts"]["annotations_per_json"]
c_cats = summary["counts"]["categories_per_json"]
print("\n[PER-JSON COUNTS]")
print(f"- images/json      : min={c_images.get('min')} max={c_images.get('max')} mean={c_images.get('mean'):.2f} median={c_images.get('median')}")
print(f"- annotations/json : min={c_anns.get('min')} max={c_anns.get('max')} mean={c_anns.get('mean'):.2f} median={c_anns.get('median')}")
print(f"- categories/json  : min={c_cats.get('min')} max={c_cats.get('max')} mean={c_cats.get('mean'):.2f} median={c_cats.get('median')}")

print("\n[TOP-LEVEL KEYS (presence count over scanned jsons)]")
for k, v in top_keys.most_common():
    print(f"- {k}: {v}")

print("\n[COMMON KEYS]")
print("- images keys (top10):", [k for k, _ in summary["images"]["common_keys_top30"][:10]])
print("- ann keys    (top10):", [k for k, _ in summary["annotations"]["common_keys_top30"][:10]])
print("- cat keys    (top10):", [k for k, _ in summary["categories"]["common_keys_top30"][:10]])

print("\n[BBOX FORMAT CHECK]")
print("- bbox length distribution:", summary["annotations"]["bbox_len_distribution"])
if summary["annotations"]["bbox_examples"]:
    print("- bbox examples:")
    for ex in summary["annotations"]["bbox_examples"]:
        print(f"  - {Path(ex['file']).name}: {ex['bbox']}")

print("\n[CATEGORIES SAMPLE]")
for ex in summary["categories"]["id_name_examples"][:10]:
    print(f"- id={ex['id']} name={ex['name']}")

print(f"\n[OK] saved -> {out_json}")
if warnings:
    print(f"[WARN] warnings={len(warnings)} (see JSON for details)")

[COCO SCHEMA SUMMARY]
- ann_root            : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\train_annotations
- json total / scanned: 763 / 50 (limit=50)

[PER-JSON COUNTS]
- images/json      : min=1 max=1 mean=1.00 median=1.0
- annotations/json : min=1 max=1 mean=1.00 median=1.0
- categories/json  : min=1 max=1 mean=1.00 median=1.0

[TOP-LEVEL KEYS (presence count over scanned jsons)]
- images: 50
- type: 50
- annotations: 50
- categories: 50

[COMMON KEYS]
- images keys (top10): ['file_name', 'width', 'height', 'imgfile', 'drug_N', 'drug_S', 'back_color', 'drug_dir', 'light_color', 'camera_la']
- ann keys    (top10): ['area', 'iscrowd', 'bbox', 'category_id', 'ignore', 'segmentation', 'id', 'image_id']
- cat keys    (top10): ['supercategory', 'id', 'name']

[BBOX FORMAT CHECK]
- bbox length distribution: {4: 50}
- bbox examples:
  - K-001900-016548-019607-029451_0_2_0_2_70_000_200.json: [644, 845, 189, 190]
  - K-001900-016548-019607-029451_0_2_0_2_75

In [7]:
# [Code Cell] 3-3. QC Gate 1: bbox/이미지 매칭/중복/객체수(<=4)/카테고리 일관성 점검 리포트

from pathlib import Path
import json
from collections import Counter, defaultdict
from datetime import datetime

# --- paths (이전 셀 변수 우선 사용) ---
ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
REPORT_DIR = Path(globals().get("DIRS", {}).get("REPORTS", ROOT_ / "artifacts" / "reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)

train_img_dir = Path(INPUT_["TRAIN_IMAGES"])
ann_root = Path(INPUT_["TRAIN_ANN_DIR"])

json_files = sorted(ann_root.rglob("*.json"))
if not json_files:
    raise FileNotFoundError(f"train_annotations 아래에서 json을 찾지 못했습니다: {ann_root}")

# --- train_images 파일 존재 체크용 set ---
train_image_files = {p.name for p in train_img_dir.glob("*.png")}

# --- collectors ---
records = []  # annotation-level records
image_meta = {}  # file_name -> {w,h,seen_paths}
missing_images = []
invalid_bbox = []
oob_bbox = []  # out-of-bounds
nonpos_bbox = []
dup_key_counter = Counter()

cat_id_to_names = defaultdict(set)  # id -> set(names)

def _is_number(x):
    return isinstance(x, (int, float))

def _round3(x):
    try:
        return round(float(x), 3)
    except Exception:
        return x

def _bbox_key(file_name, category_id, bbox):
    # 중복 판단용 키(소수점이 있을 수도 있으니 반올림)
    x, y, w, h = bbox
    return (file_name, int(category_id), _round3(x), _round3(y), _round3(w), _round3(h))

for fp in json_files:
    try:
        with open(fp, "r", encoding="utf-8") as f:
            data = json.load(f)

        images = data.get("images", [])
        anns = data.get("annotations", [])
        cats = data.get("categories", [])

        # 방어적 처리
        if not isinstance(images, list): images = []
        if not isinstance(anns, list): anns = []
        if not isinstance(cats, list): cats = []

        # categories: id-name 수집
        for c in cats:
            if not isinstance(c, dict): 
                continue
            cid = c.get("id", None)
            nm = c.get("name", None)
            if cid is not None and nm is not None:
                try:
                    cid_int = int(cid)
                except Exception:
                    continue
                cat_id_to_names[cid_int].add(str(nm))

        # images: 일반적으로 1개지만, 여러 개여도 대응
        # annotation은 image_id로 연결되지만 여기서는 file_name 기준 집계를 위해 file_name을 우선 확보
        # (대회 제출 image_id도 file stem 숫자 기반이므로 file_name이 중요)
        if len(images) == 0:
            # images가 비어있으면 이후 연결이 어려워서 경고 레코드만 남김
            invalid_bbox.append({"file": str(fp), "reason": "images_empty"})
            continue

        # images dict 만들기: image_id -> (file_name, w, h)
        img_by_id = {}
        for im in images:
            if not isinstance(im, dict):
                continue
            image_id = im.get("id", None)
            file_name = im.get("file_name", None)
            w = im.get("width", None)
            h = im.get("height", None)
            if image_id is None or file_name is None:
                continue
            img_by_id[image_id] = (str(file_name), w, h)

            # file_name 메타 누적
            if str(file_name) not in image_meta:
                image_meta[str(file_name)] = {
                    "width": w,
                    "height": h,
                    "json_paths": set(),
                }
            image_meta[str(file_name)]["json_paths"].add(str(fp))

        # annotations 처리
        if len(anns) == 0:
            invalid_bbox.append({"file": str(fp), "reason": "annotations_empty"})
            continue

        for a in anns:
            if not isinstance(a, dict):
                continue

            image_id = a.get("image_id", None)
            category_id = a.get("category_id", None)
            bbox = a.get("bbox", None)

            # file_name/size 연결
            file_name, w, h = None, None, None
            if image_id in img_by_id:
                file_name, w, h = img_by_id[image_id]
            else:
                # image_id 키가 images에 없으면 images[0]로 fallback
                # (데이터 구조가 항상 1개일 때 안전장치)
                only = next(iter(img_by_id.values()))
                file_name, w, h = only

            # 이미지 파일 존재 체크
            if file_name not in train_image_files:
                missing_images.append({"file_name": file_name, "json": str(fp)})

            # bbox 유효성
            if bbox is None or (not isinstance(bbox, list)) or len(bbox) != 4:
                invalid_bbox.append({
                    "file_name": file_name,
                    "json": str(fp),
                    "reason": "bbox_missing_or_bad_format",
                    "bbox": bbox,
                })
                continue

            x, y, bw, bh = bbox
            if not all(_is_number(v) for v in [x, y, bw, bh]):
                invalid_bbox.append({
                    "file_name": file_name,
                    "json": str(fp),
                    "reason": "bbox_not_numeric",
                    "bbox": bbox,
                })
                continue

            x, y, bw, bh = float(x), float(y), float(bw), float(bh)

            if bw <= 0 or bh <= 0:
                nonpos_bbox.append({
                    "file_name": file_name,
                    "json": str(fp),
                    "bbox": [x, y, bw, bh],
                })

            # out-of-bounds 체크(이미지 크기 w/h가 있으면)
            oob = False
            if _is_number(w) and _is_number(h):
                W, H = float(w), float(h)
                if x < 0 or y < 0 or (x + bw) > W or (y + bh) > H:
                    oob = True
                    oob_bbox.append({
                        "file_name": file_name,
                        "json": str(fp),
                        "image_wh": [W, H],
                        "bbox": [x, y, bw, bh],
                    })

            # 중복 체크 키
            if category_id is not None:
                try:
                    ck = _bbox_key(file_name, category_id, [x, y, bw, bh])
                    dup_key_counter[ck] += 1
                except Exception:
                    pass

            records.append({
                "json_path": str(fp),
                "file_name": file_name,
                "image_id": image_id,
                "category_id": int(category_id) if category_id is not None else None,
                "bbox_x": x, "bbox_y": y, "bbox_w": bw, "bbox_h": bh,
                "image_w": w, "image_h": h,
                "oob": oob,
            })

    except Exception as e:
        invalid_bbox.append({"file": str(fp), "reason": f"json_load_or_parse_failed: {repr(e)}"})

# --- image-wise aggregation ---
obj_count_by_image = Counter()
for r in records:
    obj_count_by_image[r["file_name"]] += 1

# 이미지당 객체 수 분포
dist_obj_per_image = Counter(obj_count_by_image.values())
max_objs = max(obj_count_by_image.values()) if obj_count_by_image else 0
n_images = len(obj_count_by_image)

# >4 객체 이미지(규칙 위반 여부 확인)
gt4_images = [fn for fn, c in obj_count_by_image.items() if c > 4]

# category id-name 일관성(같은 id에 name 여러 개면 문제)
cat_inconsistency = {cid: sorted(list(names)) for cid, names in cat_id_to_names.items() if len(names) > 1}

# 중복 어노테이션(키 count>1)
dup_ann = [(k, c) for k, c in dup_key_counter.items() if c > 1]
dup_ann_sorted = sorted(dup_ann, key=lambda x: x[1], reverse=True)

summary = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "ann_root": str(ann_root.resolve()),
    "train_images_dir": str(train_img_dir.resolve()),
    "n_json_files": len(json_files),
    "n_ann_records": len(records),

    "n_unique_images_in_annotations": n_images,
    "objects_per_image_distribution": dict(dist_obj_per_image),
    "max_objects_per_image": max_objs,
    "n_images_gt4": len(gt4_images),

    "bbox_issues": {
        "n_invalid_bbox": len(invalid_bbox),
        "n_nonpos_bbox": len(nonpos_bbox),
        "n_oob_bbox": len(oob_bbox),
    },
    "missing_image_files": {
        "n_missing": len(missing_images),
    },

    "categories": {
        "n_unique_category_ids_seen": len(cat_id_to_names),
        "n_inconsistent_id_name": len(cat_inconsistency),
        "inconsistent_examples": dict(list(cat_inconsistency.items())[:10]),
    },

    "duplicates": {
        "n_duplicate_ann_keys": len(dup_ann_sorted),
        "top10_duplicates": [
            {
                "file_name": k[0],
                "category_id": k[1],
                "bbox": [k[2], k[3], k[4], k[5]],
                "count": c,
            }
            for (k, c) in dup_ann_sorted[:10]
        ],
    },
}

# --- save artifacts ---
out_summary = REPORT_DIR / "qc_gate1_summary.json"
with open(out_summary, "w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)

# save image-level counts csv (간단 csv)
out_counts = REPORT_DIR / "objects_per_image.csv"
with open(out_counts, "w", newline="", encoding="utf-8") as f:
    f.write("file_name,n_objects\n")
    for fn, c in sorted(obj_count_by_image.items(), key=lambda x: (-x[1], x[0])):
        f.write(f"{fn},{c}\n")

# save category map csv (id -> name(s))
out_cats = REPORT_DIR / "categories_id_to_name.csv"
with open(out_cats, "w", newline="", encoding="utf-8") as f:
    f.write("category_id,n_names,names\n")
    for cid in sorted(cat_id_to_names.keys()):
        names = sorted(list(cat_id_to_names[cid]))
        f.write(f"{cid},{len(names)},\"{'; '.join(names)}\"\n")

# save issue samples
def _save_samples(path: Path, items, k=50):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(items[:k], f, indent=2, ensure_ascii=False)

_save_samples(REPORT_DIR / "missing_images_samples.json", missing_images, k=50)
_save_samples(REPORT_DIR / "invalid_bbox_samples.json", invalid_bbox, k=50)
_save_samples(REPORT_DIR / "oob_bbox_samples.json", oob_bbox, k=50)
_save_samples(REPORT_DIR / "nonpos_bbox_samples.json", nonpos_bbox, k=50)

print("[QC GATE 1 SUMMARY]")
print(f"- json files                : {summary['n_json_files']}")
print(f"- ann records               : {summary['n_ann_records']}")
print(f"- unique images(annotated)  : {summary['n_unique_images_in_annotations']}")
print(f"- objects/image dist        : {summary['objects_per_image_distribution']}")
print(f"- max objects per image     : {summary['max_objects_per_image']} (gt4 images={summary['n_images_gt4']})")
print(f"- invalid bbox              : {summary['bbox_issues']['n_invalid_bbox']}")
print(f"- nonpositive bbox          : {summary['bbox_issues']['n_nonpos_bbox']}")
print(f"- out-of-bounds bbox        : {summary['bbox_issues']['n_oob_bbox']}")
print(f"- missing image files       : {summary['missing_image_files']['n_missing']}")
print(f"- unique category ids       : {summary['categories']['n_unique_category_ids_seen']}")
print(f"- inconsistent id->name     : {summary['categories']['n_inconsistent_id_name']}")
print(f"- duplicate ann keys        : {summary['duplicates']['n_duplicate_ann_keys']}")

print("\n[SAVED]")
print(f"- {out_summary}")
print(f"- {out_counts}")
print(f"- {out_cats}")
print(f"- {REPORT_DIR / 'missing_images_samples.json'}")
print(f"- {REPORT_DIR / 'invalid_bbox_samples.json'}")
print(f"- {REPORT_DIR / 'oob_bbox_samples.json'}")
print(f"- {REPORT_DIR / 'nonpos_bbox_samples.json'}")

[QC GATE 1 SUMMARY]
- json files                : 763
- ann records               : 763
- unique images(annotated)  : 232
- objects/image dist        : {4: 74, 3: 151, 2: 7}
- max objects per image     : 4 (gt4 images=0)
- invalid bbox              : 0
- nonpositive bbox          : 0
- out-of-bounds bbox        : 1
- missing image files       : 0
- unique category ids       : 56
- inconsistent id->name     : 0
- duplicate ann keys        : 0

[SAVED]
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\qc_gate1_summary.json
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\objects_per_image.csv
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\categories_id_to_name.csv
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports

In [8]:
# [Code Cell] 3-4. train_annotations(객체 1개/JSON) -> 이미지 단위 통합 COCO 생성 + bbox 클리핑 + 매핑 저장

from pathlib import Path
import json
from collections import defaultdict
from datetime import datetime

ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
DIRS_ = globals().get("DIRS", {})
CACHE_DIR = Path(DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged"))
CACHE_DIR.mkdir(parents=True, exist_ok=True)

train_img_dir = Path(INPUT_["TRAIN_IMAGES"])
ann_root = Path(INPUT_["TRAIN_ANN_DIR"])
json_files = sorted(ann_root.rglob("*.json"))
if not json_files:
    raise FileNotFoundError(f"train_annotations 아래에서 json을 찾지 못했습니다: {ann_root}")

# 옵션: file stem이 숫자면 그것을 image_id로 사용(제출 규칙과 연결하기 쉬움)
USE_FILE_STEM_AS_IMAGE_ID = True

def _to_int_stem(file_name: str):
    try:
        return int(Path(file_name).stem)
    except Exception:
        return None

def _clip_bbox_xywh(bbox, W, H):
    """bbox=[x,y,w,h]를 이미지 경계로 클리핑. 반환: (new_bbox, clipped_flag, valid_flag)"""
    x, y, w, h = map(float, bbox)
    x1, y1 = x, y
    x2, y2 = x + w, y + h

    # clip
    cx1 = max(0.0, min(x1, float(W)))
    cy1 = max(0.0, min(y1, float(H)))
    cx2 = max(0.0, min(x2, float(W)))
    cy2 = max(0.0, min(y2, float(H)))

    nw = cx2 - cx1
    nh = cy2 - cy1
    clipped = (cx1 != x1) or (cy1 != y1) or (cx2 != x2) or (cy2 != y2)
    valid = (nw > 0.0) and (nh > 0.0)
    return [cx1, cy1, nw, nh], clipped, valid

# ---- aggregate per image(file_name) ----
img_meta_by_name = {}
anns_by_name = defaultdict(list)
cat_id_to_name = {}

seen_json = 0
skipped_ann = 0
clipped_ann = 0
invalid_after_clip = 0

for fp in json_files:
    seen_json += 1
    with open(fp, "r", encoding="utf-8") as f:
        data = json.load(f)

    images = data.get("images", [])
    anns = data.get("annotations", [])
    cats = data.get("categories", [])

    if not images or not anns or not cats:
        skipped_ann += 1
        continue

    im = images[0]
    a = anns[0]
    c = cats[0]

    file_name = str(im.get("file_name"))
    W = im.get("width", None)
    H = im.get("height", None)

    # categories map 수집
    cid = c.get("id", None)
    cname = c.get("name", None)
    if cid is not None and cname is not None:
        cat_id_to_name[int(cid)] = str(cname)

    # 이미지 메타 고정(처음 본 걸 기준)
    if file_name not in img_meta_by_name:
        img_meta_by_name[file_name] = {
            "file_name": file_name,
            "width": W,
            "height": H,
            # 필요하면 추가 메타도 유지 가능(너무 크면 최소 필드만 권장)
        }

    bbox = a.get("bbox", None)
    category_id = a.get("category_id", None)

    if bbox is None or (not isinstance(bbox, list)) or len(bbox) != 4:
        skipped_ann += 1
        continue
    if category_id is None:
        skipped_ann += 1
        continue

    # bbox clip(이미지 W/H가 있을 때만)
    clipped = False
    valid = True
    new_bbox = list(map(float, bbox))
    if isinstance(W, (int, float)) and isinstance(H, (int, float)):
        new_bbox, clipped, valid = _clip_bbox_xywh(bbox, W, H)
        if clipped:
            clipped_ann += 1
        if not valid:
            invalid_after_clip += 1
            continue

    anns_by_name[file_name].append({
        "category_id": int(category_id),
        "bbox": new_bbox,
        # COCO 최소 필드(필요하면 확장)
        "iscrowd": int(a.get("iscrowd", 0) or 0),
        "ignore": int(a.get("ignore", 0) or 0),
        "area": float(a.get("area", new_bbox[2] * new_bbox[3])),
        "segmentation": a.get("segmentation", []),
    })

# ---- build merged COCO ----
# image_id 정책
file_names = sorted(img_meta_by_name.keys())
image_id_map = {}
images_out = []
annotations_out = []

next_img_id = 1
next_ann_id = 1

for fn in file_names:
    meta = img_meta_by_name[fn]
    W, H = meta.get("width"), meta.get("height")

    img_id = None
    if USE_FILE_STEM_AS_IMAGE_ID:
        img_id = _to_int_stem(fn)
    if img_id is None:
        img_id = next_img_id
        next_img_id += 1

    image_id_map[fn] = img_id

    images_out.append({
        "id": img_id,
        "file_name": fn,
        "width": W,
        "height": H,
    })

    # annotations (이미지당 최대 4개 기대)
    for ann in anns_by_name.get(fn, []):
        x, y, w, h = ann["bbox"]
        annotations_out.append({
            "id": next_ann_id,
            "image_id": img_id,
            "category_id": ann["category_id"],
            "bbox": [x, y, w, h],
            "area": float(ann.get("area", w * h)),
            "iscrowd": int(ann.get("iscrowd", 0)),
            "ignore": int(ann.get("ignore", 0)),
            "segmentation": ann.get("segmentation", []),
        })
        next_ann_id += 1

# categories 정리(id 정렬)
categories_out = []
for cid in sorted(cat_id_to_name.keys()):
    categories_out.append({
        "id": int(cid),
        "name": cat_id_to_name[cid],
        "supercategory": "pill",
    })

merged = {
    "info": {
        "description": "AI07 pill OD - merged coco (train)",
        "created_at": datetime.now().isoformat(timespec="seconds"),
        "source": str(ann_root.resolve()),
    },
    "images": images_out,
    "annotations": annotations_out,
    "categories": categories_out,
}

# ---- sanity checks ----
# 객체수 분포
from collections import Counter
cnt = Counter()
for a in annotations_out:
    cnt[a["image_id"]] += 1
dist = Counter(cnt.values())
max_objs = max(cnt.values()) if cnt else 0
gt4 = sum(1 for v in cnt.values() if v > 4)

# ---- save ----
out_coco = CACHE_DIR / "train_merged_coco.json"
out_map = CACHE_DIR / "image_id_map.json"
out_cat = CACHE_DIR / "category_id_to_name.json"

with open(out_coco, "w", encoding="utf-8") as f:
    json.dump(merged, f, indent=2, ensure_ascii=False)
with open(out_map, "w", encoding="utf-8") as f:
    json.dump(image_id_map, f, indent=2, ensure_ascii=False)
with open(out_cat, "w", encoding="utf-8") as f:
    json.dump(cat_id_to_name, f, indent=2, ensure_ascii=False)

print("[MERGE COCO DONE]")
print(f"- json files read         : {seen_json}")
print(f"- unique images           : {len(images_out)}")
print(f"- annotations             : {len(annotations_out)}")
print(f"- unique category ids     : {len(categories_out)}")

print("\n[CLIP/INVALID]")
print(f"- clipped bbox count      : {clipped_ann}")
print(f"- invalid after clip(drop): {invalid_after_clip}")
print(f"- skipped(etc)            : {skipped_ann}")

print("\n[OBJECTS PER IMAGE]")
print(f"- dist (n_obj -> n_images): {dict(dist)}")
print(f"- max objects per image   : {max_objs} (gt4 images={gt4})")

print("\n[SAVED]")
print(f"- merged coco  : {out_coco}")
print(f"- image_id_map : {out_map}")
print(f"- cat_id->name : {out_cat}")

[MERGE COCO DONE]
- json files read         : 763
- unique images           : 232
- annotations             : 762
- unique category ids     : 56

[CLIP/INVALID]
- clipped bbox count      : 1
- invalid after clip(drop): 1
- skipped(etc)            : 0

[OBJECTS PER IMAGE]
- dist (n_obj -> n_images): {4: 74, 3: 150, 2: 8}
- max objects per image   : 4 (gt4 images=0)

[SAVED]
- merged coco  : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\train_merged_coco.json
- image_id_map : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\image_id_map.json
- cat_id->name : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\category_id_to_name.json


In [9]:
# [Code Cell] 3-5. category_id 매핑(연속 index) + class distribution + (optional) test whitelist 적용 준비

from pathlib import Path
import json
from collections import Counter
from datetime import datetime

ROOT_ = globals().get("ROOT", Path(".").resolve())
DIRS_ = globals().get("DIRS", {})

# 3-4에서 사용한 CACHE_DIR 우선, 없으면 DIRS["CACHE"] 기반
CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))

# merged coco 경로 탐색(기본 파일명)
MERGED_COCO = CACHE_DIR / "train_merged_coco.json"
if not MERGED_COCO.exists():
    # 혹시 파일명이 다르거나 경로가 달라졌을 때를 대비해 rglob fallback
    cand = list(CACHE_DIR.rglob("train_merged_coco.json"))
    if cand:
        MERGED_COCO = cand[0]
    else:
        raise FileNotFoundError(f"train_merged_coco.json 을 찾지 못했습니다. CACHE_DIR={CACHE_DIR}")

REPORT_DIR = Path(DIRS_.get("REPORTS", ROOT_ / "artifacts" / "reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)

# -----------------------------
# 1) merged coco 로드
# -----------------------------
with open(MERGED_COCO, "r", encoding="utf-8") as f:
    coco = json.load(f)

images = coco.get("images", [])
anns = coco.get("annotations", [])
cats = coco.get("categories", [])

if not (isinstance(images, list) and isinstance(anns, list) and isinstance(cats, list)):
    raise ValueError("merged coco 구조가 예상과 다릅니다. images/annotations/categories가 list인지 확인하세요.")

# -----------------------------
# 2) category_id -> name, 정렬된 category_id 목록
# -----------------------------
cat_id_to_name = {}
for c in cats:
    if not isinstance(c, dict):
        continue
    cid = c.get("id", None)
    name = c.get("name", None)
    if cid is None or name is None:
        continue
    cat_id_to_name[int(cid)] = str(name)

cat_ids_sorted = sorted(cat_id_to_name.keys())
if not cat_ids_sorted:
    raise ValueError("categories에서 유효한 category id를 추출하지 못했습니다.")

# -----------------------------
# 3) 클래스 분포(원본 category_id 기준)
# -----------------------------
cat_counts = Counter()
for a in anns:
    if not isinstance(a, dict):
        continue
    cid = a.get("category_id", None)
    if cid is None:
        continue
    cat_counts[int(cid)] += 1

# -----------------------------
# 4) (옵션) test whitelist 로드 시도
#    - 있으면: whitelist 기반 매핑도 추가로 저장
#    - 없으면: full 매핑만 저장하고 안내 메시지 출력
# -----------------------------
def _load_whitelist():
    """
    whitelist 파일 자동 탐색:
    - ./data/test_class_whitelist.json  (예: {"whitelist":[1,2,...]} 또는 [1,2,...])
    - ./data/test_classes_40.json
    - ./data/whitelist_40.txt (한 줄에 하나)
    - ./data/whitelist.txt
    """
    candidates = [
        ROOT_ / "data" / "test_class_whitelist.json",
        ROOT_ / "data" / "test_classes_40.json",
        ROOT_ / "data" / "whitelist_40.txt",
        ROOT_ / "data" / "whitelist.txt",
        CACHE_DIR / "test_class_whitelist.json",
        CACHE_DIR / "test_classes_40.json",
    ]
    for p in candidates:
        if not p.exists():
            continue
        if p.suffix.lower() == ".txt":
            vals = []
            for line in p.read_text(encoding="utf-8").splitlines():
                line = line.strip()
                if not line:
                    continue
                vals.append(int(line))
            return sorted(set(vals)), str(p)
        if p.suffix.lower() == ".json":
            obj = json.loads(p.read_text(encoding="utf-8"))
            if isinstance(obj, list):
                return sorted(set(int(x) for x in obj)), str(p)
            if isinstance(obj, dict):
                key = "whitelist" if "whitelist" in obj else ("classes" if "classes" in obj else None)
                if key and isinstance(obj[key], list):
                    return sorted(set(int(x) for x in obj[key])), str(p)
    return None, None

whitelist_ids, whitelist_path = _load_whitelist()

# -----------------------------
# 5) 매핑 생성
#    - full mapping: 모든 train category_id -> contiguous index(0..N-1)
#    - whitelist mapping(옵션): whitelist에 있는 category_id만 -> contiguous index
# -----------------------------
def build_mapping(category_ids, cat_id_to_name_dict):
    id2idx = {cid: i for i, cid in enumerate(category_ids)}
    idx2id = {i: cid for cid, i in id2idx.items()}
    names = [cat_id_to_name_dict[cid] for cid in category_ids]
    return {
        "category_ids": category_ids,
        "id2idx": id2idx,
        "idx2id": idx2id,
        "names": names,  # index->class name
        "num_classes": len(category_ids),
    }

full_map = build_mapping(cat_ids_sorted, cat_id_to_name)

whitelist_map = None
train_only_ids = []
if whitelist_ids:
    # whitelist에 있지만 train에 없는 id는 제외(안전)
    whitelist_ids_in_train = [cid for cid in whitelist_ids if cid in cat_id_to_name]
    whitelist_map = build_mapping(whitelist_ids_in_train, cat_id_to_name)
    train_only_ids = [cid for cid in cat_ids_sorted if cid not in set(whitelist_ids_in_train)]

# -----------------------------
# 6) 저장 (cache + reports)
# -----------------------------
out_full = CACHE_DIR / "label_map_full.json"
with open(out_full, "w", encoding="utf-8") as f:
    json.dump(full_map, f, indent=2, ensure_ascii=False)

out_counts = REPORT_DIR / "class_counts_by_category_id.csv"
with open(out_counts, "w", encoding="utf-8") as f:
    f.write("category_id,class_name,n_annotations\n")
    for cid in cat_ids_sorted:
        f.write(f"{cid},\"{cat_id_to_name[cid]}\",{cat_counts.get(cid,0)}\n")

# whitelist가 있으면 추가 저장
out_whitelist = None
out_train_only = None
if whitelist_map is not None:
    out_whitelist = CACHE_DIR / "label_map_whitelist.json"
    with open(out_whitelist, "w", encoding="utf-8") as f:
        json.dump({
            **whitelist_map,
            "whitelist_source": whitelist_path,
        }, f, indent=2, ensure_ascii=False)

    out_train_only = REPORT_DIR / "train_only_category_ids.json"
    with open(out_train_only, "w", encoding="utf-8") as f:
        json.dump({
            "train_only_category_ids": train_only_ids,
            "n_train_only": len(train_only_ids),
            "whitelist_source": whitelist_path,
        }, f, indent=2, ensure_ascii=False)

# -----------------------------
# 7) CFG 업데이트(가능하면)
# -----------------------------
if "CFG" in globals():
    CFG["data"]["num_classes"] = full_map["num_classes"]
    CFG["data"]["class_whitelist"] = whitelist_ids if whitelist_ids else None
    # 저장 함수가 있으면 호출
    if "save_cfg" in globals():
        save_cfg()

# -----------------------------
# 8) 출력 요약
# -----------------------------
print("[LABEL MAP BUILT]")
print(f"- merged coco           : {MERGED_COCO}")
print(f"- num train categories  : {full_map['num_classes']}")
print(f"- saved full map        : {out_full}")
print(f"- saved class counts    : {out_counts}")

# 클래스 분포 요약(상/하위)
most = cat_counts.most_common(5)
least = sorted(cat_counts.items(), key=lambda x: x[1])[:5]
print("\n[CLASS COUNT TOP5]")
for cid, n in most:
    print(f"- {cid} | {cat_id_to_name.get(cid,'?')} | n={n}")
print("\n[CLASS COUNT BOTTOM5]")
for cid, n in least:
    print(f"- {cid} | {cat_id_to_name.get(cid,'?')} | n={n}")

if whitelist_ids:
    print("\n[WHITELIST DETECTED]")
    print(f"- source              : {whitelist_path}")
    print(f"- whitelist ids (raw) : {len(whitelist_ids)}")
    print(f"- whitelist in train  : {whitelist_map['num_classes']}")
    print(f"- train-only ids      : {len(train_only_ids)}")
    print(f"- saved whitelist map : {out_whitelist}")
    print(f"- saved train-only    : {out_train_only}")
else:
    print("\n[WHITELIST NOT FOUND]")
    print("- 지금은 train(56 클래스) 기준 full 매핑만 저장했습니다.")
    print("- 나중에 test 40개 클래스 id 리스트를 파일로 저장하면(예: ./data/whitelist_40.txt) 자동으로 whitelist 매핑도 생성됩니다.")

[LABEL MAP BUILT]
- merged coco           : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\train_merged_coco.json
- num train categories  : 56
- saved full map        : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\label_map_full.json
- saved class counts    : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\class_counts_by_category_id.csv

[CLASS COUNT TOP5]
- 3351 | 일양하이트린정 2mg | n=153
- 3483 | 기넥신에프정(은행엽엑스)(수출용) | n=45
- 35206 | 아토젯정 10/40mg | n=37
- 16262 | 크레스토정 20mg | n=23
- 21325 | 아토르바정 10mg | n=22

[CLASS COUNT BOTTOM5]
- 29451 | 레일라정 | n=3
- 33009 | 신바로정 | n=3
- 21771 | 라비에트정 20mg | n=3
- 27926 | 울트라셋이알서방정 | n=3
- 24850 | 놀텍정 10mg | n=3

[WHITELIST NOT FOUND]
- 지금은 train(56 클래스) 기준 full 매핑만 저장했습니다.
- 나중에 test 40개 클래스 id 리스트를 파일로 저장하면(예: ./data/whitelist_40.txt) 자동으로 whitelist 매핑도 생성됩니

In [10]:
# [Code Cell] 3-6. merged COCO -> image-level table(멀티라벨/객체수) 생성 + 분포 리포트 저장

from pathlib import Path
import json
from collections import defaultdict, Counter
from datetime import datetime
import hashlib

ROOT_ = globals().get("ROOT", Path(".").resolve())
DIRS_ = globals().get("DIRS", {})

CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
MERGED_COCO = CACHE_DIR / "train_merged_coco.json"
LABEL_MAP = CACHE_DIR / "label_map_full.json"

if not MERGED_COCO.exists():
    cand = list(CACHE_DIR.rglob("train_merged_coco.json"))
    if cand:
        MERGED_COCO = cand[0]
    else:
        raise FileNotFoundError(f"train_merged_coco.json 을 찾지 못했습니다. CACHE_DIR={CACHE_DIR}")

REPORT_DIR = Path(DIRS_.get("REPORTS", ROOT_ / "artifacts" / "reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)

with open(MERGED_COCO, "r", encoding="utf-8") as f:
    coco = json.load(f)

images = coco.get("images", [])
anns = coco.get("annotations", [])
cats = coco.get("categories", [])

# category_id -> name
cat_id_to_name = {}
for c in cats:
    if isinstance(c, dict) and c.get("id") is not None and c.get("name") is not None:
        cat_id_to_name[int(c["id"])] = str(c["name"])

# (있으면) label_map_full의 id2idx 사용(학습용 인덱스)
id2idx = None
if LABEL_MAP.exists():
    with open(LABEL_MAP, "r", encoding="utf-8") as f:
        lm = json.load(f)
    id2idx = {int(k): int(v) for k, v in lm.get("id2idx", {}).items()}

# image_id -> meta
img_by_id = {int(im["id"]): im for im in images if isinstance(im, dict) and im.get("id") is not None}

# image_id -> list of anns
anns_by_img = defaultdict(list)
for a in anns:
    if not isinstance(a, dict):
        continue
    iid = a.get("image_id", None)
    if iid is None:
        continue
    anns_by_img[int(iid)].append(a)

rows = []
n_missing_img_meta = 0

for iid, alist in anns_by_img.items():
    im = img_by_id.get(iid, None)
    if im is None:
        n_missing_img_meta += 1
        file_name = None
        W = None
        H = None
    else:
        file_name = im.get("file_name")
        W = im.get("width")
        H = im.get("height")

    cat_ids = [int(a["category_id"]) for a in alist if a.get("category_id") is not None]
    uniq_cat_ids = sorted(set(cat_ids))
    n_obj = len(alist)
    n_labels = len(uniq_cat_ids)

    # 멀티라벨 시그니처(스플릿에서 유용)
    sig = ",".join(map(str, uniq_cat_ids))
    sig_hash = hashlib.md5(sig.encode("utf-8")).hexdigest()[:10]  # 짧게

    # bbox 요약(작은 객체 분포/품질 진단용)
    areas = []
    for a in alist:
        b = a.get("bbox", None)
        if isinstance(b, list) and len(b) == 4:
            w = float(b[2]); h = float(b[3])
            if w > 0 and h > 0:
                areas.append(w * h)

    row = {
        "image_id": int(iid),
        "file_name": file_name,
        "width": W,
        "height": H,
        "n_objects": n_obj,
        "n_unique_labels": n_labels,
        "category_ids": uniq_cat_ids,  # 리스트 그대로(json 저장용)
        "category_names": [cat_id_to_name.get(cid, "?") for cid in uniq_cat_ids],
        "label_signature": sig,
        "label_sig_hash": sig_hash,
        "area_min": min(areas) if areas else None,
        "area_max": max(areas) if areas else None,
        "area_mean": (sum(areas) / len(areas)) if areas else None,
    }

    # 학습용 contiguous class index도 같이 저장(있을 때만)
    if id2idx is not None:
        row["class_indices"] = [id2idx.get(cid, None) for cid in uniq_cat_ids]
    rows.append(row)

# ---- distributions ----
dist_obj = Counter(r["n_objects"] for r in rows)
dist_labels = Counter(r["n_unique_labels"] for r in rows)

# 클래스 커버리지(이미지 단위 포함 빈도)
img_count_per_cat = Counter()
obj_count_per_cat = Counter()
for r in rows:
    uniq = r["category_ids"]
    for cid in uniq:
        img_count_per_cat[cid] += 1
for a in anns:
    if isinstance(a, dict) and a.get("category_id") is not None:
        obj_count_per_cat[int(a["category_id"])] += 1

summary = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "merged_coco": str(MERGED_COCO),
    "n_images": len(rows),
    "n_annotations": len(anns),
    "n_categories": len(cat_id_to_name),
    "n_missing_img_meta": n_missing_img_meta,
    "dist_n_objects": dict(dist_obj),
    "dist_n_unique_labels": dict(dist_labels),
    "top5_cats_by_obj": [
        {"category_id": cid, "name": cat_id_to_name.get(cid, "?"), "n_objects": n}
        for cid, n in obj_count_per_cat.most_common(5)
    ],
    "bottom5_cats_by_obj": [
        {"category_id": cid, "name": cat_id_to_name.get(cid, "?"), "n_objects": n}
        for cid, n in sorted(obj_count_per_cat.items(), key=lambda x: x[1])[:5]
    ],
}

# ---- save: jsonl (row-level) ----
out_jsonl = REPORT_DIR / "image_table.jsonl"
with open(out_jsonl, "w", encoding="utf-8") as f:
    for r in rows:
        f.write(json.dumps(r, ensure_ascii=False) + "\n")

# ---- save: csv (리스트 컬럼은 문자열로) ----
out_csv = REPORT_DIR / "image_table.csv"
with open(out_csv, "w", encoding="utf-8", newline="") as f:
    f.write("image_id,file_name,width,height,n_objects,n_unique_labels,label_sig_hash,label_signature,category_ids,area_min,area_max,area_mean\n")
    for r in sorted(rows, key=lambda x: x["image_id"]):
        cat_ids_str = json.dumps(r["category_ids"], ensure_ascii=False)
        sig = r["label_signature"]
        f.write(
            f"{r['image_id']},{r['file_name']},{r['width']},{r['height']},"
            f"{r['n_objects']},{r['n_unique_labels']},{r['label_sig_hash']},"
            f"\"{sig}\",\"{cat_ids_str}\",{r['area_min']},{r['area_max']},{r['area_mean']}\n"
        )

# ---- save: summary ----
out_summary = REPORT_DIR / "image_table_summary.json"
with open(out_summary, "w", encoding="utf-8") as f:
    json.dump(summary, f, indent=2, ensure_ascii=False)

# ---- prints ----
print("[IMAGE TABLE BUILT]")
print(f"- n_images        : {summary['n_images']}")
print(f"- n_annotations   : {summary['n_annotations']}")
print(f"- n_categories    : {summary['n_categories']}")
print(f"- dist n_objects  : {summary['dist_n_objects']}")
print(f"- dist n_labels   : {summary['dist_n_unique_labels']}")
print(f"- missing img meta: {summary['n_missing_img_meta']}")

print("\n[SAVED]")
print(f"- {out_csv}")
print(f"- {out_jsonl}")
print(f"- {out_summary}")

[IMAGE TABLE BUILT]
- n_images        : 232
- n_annotations   : 762
- n_categories    : 56
- dist n_objects  : {4: 74, 3: 150, 2: 8}
- dist n_labels   : {4: 74, 3: 150, 2: 8}
- missing img meta: 0

[SAVED]
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\image_table.csv
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\image_table.jsonl
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\reports\image_table_summary.json


In [11]:
# [Code Cell] 3-7. 스플릿 고정(train/val) + stratify(객체수/라벨시그니처) + split 파일 저장

from pathlib import Path
import json
import random
from collections import Counter, defaultdict
from datetime import datetime

ROOT_ = globals().get("ROOT", Path(".").resolve())
DIRS_ = globals().get("DIRS", {})
CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
REPORT_DIR = Path(DIRS_.get("REPORTS", ROOT_ / "artifacts" / "reports"))
REPORT_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)

# inputs from previous steps
MERGED_COCO = CACHE_DIR / "train_merged_coco.json"
IMAGE_TABLE_JSONL = REPORT_DIR / "image_table.jsonl"

if not MERGED_COCO.exists():
    cand = list(CACHE_DIR.rglob("train_merged_coco.json"))
    if cand:
        MERGED_COCO = cand[0]
    else:
        raise FileNotFoundError(f"train_merged_coco.json 을 찾지 못했습니다. CACHE_DIR={CACHE_DIR}")

if not IMAGE_TABLE_JSONL.exists():
    cand = list(REPORT_DIR.rglob("image_table.jsonl"))
    if cand:
        IMAGE_TABLE_JSONL = cand[0]
    else:
        raise FileNotFoundError(f"image_table.jsonl 을 찾지 못했습니다. REPORT_DIR={REPORT_DIR}")

# -----------------------------
# 0) split config
# -----------------------------
SEED_ = int(globals().get("SEED", 42))
RATIOS = globals().get("CFG", {}).get("split", {}).get("ratios", {"train": 0.8, "valid": 0.2})
train_ratio = float(RATIOS.get("train", 0.8))
valid_ratio = float(RATIOS.get("valid", 0.2))

# 안전: 합이 1이 아니면 valid를 보정
s = train_ratio + valid_ratio
if abs(s - 1.0) > 1e-6:
    valid_ratio = 1.0 - train_ratio

# Stratify 모드:
# - "n_objects": 이미지당 객체수(2/3/4)
# - "signature": label_sig_hash (멀티라벨 시그니처)
# - "hybrid": n_objects + signature (가능하면 이게 제일 안전)
STRATIFY_MODE = "hybrid"  # "n_objects" | "signature" | "hybrid"

# 하이브리드에서 너무 쪼개져 샘플이 1개짜리 strata가 많으면 fallback
MIN_PER_STRATUM = 2  # strata의 최소 샘플 수(이보다 작으면 fallback)

# -----------------------------
# 1) image_table 로드
# -----------------------------
rows = []
with open(IMAGE_TABLE_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        rows.append(json.loads(line))

if not rows:
    raise RuntimeError("image_table.jsonl이 비어 있습니다. 3-6 셀을 확인하세요.")

# -----------------------------
# 2) stratify key 생성
# -----------------------------
def make_key(r):
    nobj = int(r["n_objects"])
    sig = r.get("label_sig_hash", "")
    if STRATIFY_MODE == "n_objects":
        return f"nobj={nobj}"
    if STRATIFY_MODE == "signature":
        return f"sig={sig}"
    # hybrid
    return f"nobj={nobj}|sig={sig}"

keys = [make_key(r) for r in rows]
key_counts = Counter(keys)

# strata가 너무 쪼개졌는지 판단(샘플 1개 strata가 많으면)
n_singletons = sum(1 for k, c in key_counts.items() if c < MIN_PER_STRATUM)
singleton_ratio = n_singletons / max(1, len(key_counts))

FALLBACK_USED = False
if STRATIFY_MODE == "hybrid" and (n_singletons > 0):
    # 하이브리드가 너무 잘게 나뉘면 n_objects로 fallback
    STRATIFY_MODE = "n_objects"
    FALLBACK_USED = True
    keys = [make_key(r) for r in rows]
    key_counts = Counter(keys)

# -----------------------------
# 3) stratified split
# -----------------------------
rng = random.Random(SEED_)
by_key = defaultdict(list)
for r, k in zip(rows, keys):
    by_key[k].append(r)

# 각 strata 내부 shuffle
for k in by_key:
    rng.shuffle(by_key[k])

train_ids = []
valid_ids = []

for k, items in by_key.items():
    n = len(items)
    n_train = int(round(n * train_ratio))
    # 너무 작은 strata는 최소 1개는 valid로 보내거나(가능하면) train에만 두지 않도록 조절
    if n >= 2:
        n_train = min(max(1, n_train), n - 1)  # train: [1, n-1]
    else:
        n_train = n  # 1개면 train로
    train_part = items[:n_train]
    valid_part = items[n_train:]
    train_ids.extend([int(x["image_id"]) for x in train_part])
    valid_ids.extend([int(x["image_id"]) for x in valid_part])

# 전역 shuffle(고정 seed)
rng.shuffle(train_ids)
rng.shuffle(valid_ids)

# -----------------------------
# 4) split 산출물 저장
# -----------------------------
RUN_NAME_ = globals().get("RUN_NAME", "run")
SPLIT_DIR = CACHE_DIR / "splits"
SPLIT_DIR.mkdir(parents=True, exist_ok=True)

split_obj = {
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "run_name": RUN_NAME_,
    "seed": SEED_,
    "train_ratio": train_ratio,
    "valid_ratio": valid_ratio,
    "stratify_mode": "n_objects" if FALLBACK_USED else STRATIFY_MODE,
    "fallback_used": FALLBACK_USED,
    "counts": {
        "total": len(rows),
        "train": len(train_ids),
        "valid": len(valid_ids),
    },
    "train_image_ids": train_ids,
    "valid_image_ids": valid_ids,
}

out_split_json = SPLIT_DIR / "split_train_valid.json"
with open(out_split_json, "w", encoding="utf-8") as f:
    json.dump(split_obj, f, indent=2, ensure_ascii=False)

# 텍스트 버전도 저장(간편용)
out_train_txt = SPLIT_DIR / "train_ids.txt"
out_valid_txt = SPLIT_DIR / "valid_ids.txt"
out_train_txt.write_text("\n".join(map(str, train_ids)) + "\n", encoding="utf-8")
out_valid_txt.write_text("\n".join(map(str, valid_ids)) + "\n", encoding="utf-8")

# -----------------------------
# 5) split 품질 점검(분포 비교)
# -----------------------------
id_to_row = {int(r["image_id"]): r for r in rows}

def dist(ids, key):
    c = Counter()
    for iid in ids:
        r = id_to_row.get(int(iid))
        if r is None:
            continue
        c[int(r[key])] += 1
    return dict(c)

train_dist_obj = dist(train_ids, "n_objects")
valid_dist_obj = dist(valid_ids, "n_objects")

print("[SPLIT DONE]")
print(f"- seed           : {SEED_}")
print(f"- stratify_mode  : {split_obj['stratify_mode']} (fallback={FALLBACK_USED})")
print(f"- counts         : total={split_obj['counts']['total']} train={split_obj['counts']['train']} valid={split_obj['counts']['valid']}")

print("\n[DISTRIBUTION CHECK]")
print(f"- train n_objects: {train_dist_obj}")
print(f"- valid n_objects: {valid_dist_obj}")

print("\n[SAVED]")
print(f"- split json : {out_split_json}")
print(f"- train txt  : {out_train_txt}")
print(f"- valid txt  : {out_valid_txt}")

# CFG 업데이트(있으면)
if "CFG" in globals():
    CFG["split"]["seed"] = SEED_
    CFG["split"]["ratios"] = {"train": train_ratio, "valid": valid_ratio}
    CFG["split"]["strategy"] = split_obj["stratify_mode"]
    if "save_cfg" in globals():
        save_cfg()

[SPLIT DONE]
- seed           : 42
- stratify_mode  : n_objects (fallback=True)
- counts         : total=232 train=185 valid=47

[DISTRIBUTION CHECK]
- train n_objects: {3: 120, 4: 59, 2: 6}
- valid n_objects: {3: 30, 4: 15, 2: 2}

[SAVED]
- split json : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\splits\split_train_valid.json
- train txt  : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\splits\train_ids.txt
- valid txt  : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\cache\exp_20260202_230604\splits\valid_ids.txt


In [12]:
# [Code Cell] 3-8. (Ultralytics YOLO) merged COCO + split -> YOLO dataset(images/labels) 생성 + data.yaml 저장 + 검증

from pathlib import Path
import json
import shutil
from collections import defaultdict, Counter
from datetime import datetime

ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
WORK_ = globals().get("WORK", {"DATA": ROOT_ / "data"})
DIRS_ = globals().get("DIRS", {})
RUN_NAME_ = globals().get("RUN_NAME", "run")

CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
MERGED_COCO = CACHE_DIR / "train_merged_coco.json"
LABEL_MAP = CACHE_DIR / "label_map_full.json"
SPLIT_JSON = CACHE_DIR / "splits" / "split_train_valid.json"

if not MERGED_COCO.exists():
    cand = list(CACHE_DIR.rglob("train_merged_coco.json"))
    if cand: MERGED_COCO = cand[0]
    else: raise FileNotFoundError(f"train_merged_coco.json not found in {CACHE_DIR}")

if not LABEL_MAP.exists():
    cand = list(CACHE_DIR.rglob("label_map_full.json"))
    if cand: LABEL_MAP = cand[0]
    else: raise FileNotFoundError(f"label_map_full.json not found in {CACHE_DIR}")

if not SPLIT_JSON.exists():
    cand = list(CACHE_DIR.rglob("split_train_valid.json"))
    if cand: SPLIT_JSON = cand[0]
    else: raise FileNotFoundError(f"split_train_valid.json not found in {CACHE_DIR}")

train_img_src = Path(INPUT_["TRAIN_IMAGES"])
if not train_img_src.exists():
    raise FileNotFoundError(f"train_images dir not found: {train_img_src}")

# -------------------------
# 1) Load inputs
# -------------------------
with open(MERGED_COCO, "r", encoding="utf-8") as f:
    coco = json.load(f)

with open(LABEL_MAP, "r", encoding="utf-8") as f:
    lm = json.load(f)

with open(SPLIT_JSON, "r", encoding="utf-8") as f:
    split = json.load(f)

images = coco.get("images", [])
anns = coco.get("annotations", [])
cats = coco.get("categories", [])

# id2idx: category_id -> class_index(0..nc-1)
id2idx = {int(k): int(v) for k, v in lm.get("id2idx", {}).items()}
names = lm.get("names", None)
nc = int(lm.get("num_classes", len(id2idx)))

if names is None or len(names) != nc:
    # fallback: categories에서 name을 id2idx 순으로 구성
    cat_id_to_name = {int(c["id"]): str(c["name"]) for c in cats if isinstance(c, dict) and c.get("id") is not None and c.get("name") is not None}
    names = [cat_id_to_name.get(cid, str(cid)) for cid in sorted(id2idx, key=lambda x: id2idx[x])]

train_ids = [int(x) for x in split.get("train_image_ids", [])]
val_ids = [int(x) for x in split.get("valid_image_ids", [])]
train_set = set(train_ids)
val_set = set(val_ids)

# image_id -> meta
img_by_id = {int(im["id"]): im for im in images if isinstance(im, dict) and im.get("id") is not None}

# image_id -> annotations
anns_by_img = defaultdict(list)
for a in anns:
    if not isinstance(a, dict): 
        continue
    iid = a.get("image_id", None)
    if iid is None:
        continue
    anns_by_img[int(iid)].append(a)

# -------------------------
# 2) YOLO dataset dirs
# -------------------------
DATASET_ROOT = Path(WORK_["DATA"]) / "datasets" / f"pill_od_yolo_{RUN_NAME_}"
IMG_TRAIN_DIR = DATASET_ROOT / "images" / "train"
IMG_VAL_DIR = DATASET_ROOT / "images" / "val"
LBL_TRAIN_DIR = DATASET_ROOT / "labels" / "train"
LBL_VAL_DIR = DATASET_ROOT / "labels" / "val"

for p in [IMG_TRAIN_DIR, IMG_VAL_DIR, LBL_TRAIN_DIR, LBL_VAL_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# copy mode: 'copy' 기본 (232장이라 부담 적음). 필요하면 'symlink'로 바꿔도 됨.
COPY_MODE = "copy"  # "copy" | "symlink"

def put_image(src: Path, dst: Path):
    if dst.exists():
        return
    if COPY_MODE == "symlink":
        try:
            dst.symlink_to(src.resolve())
            return
        except Exception:
            # symlink 권한/환경 이슈면 copy로 fallback
            shutil.copy2(src, dst)
            return
    shutil.copy2(src, dst)

def xywh_to_yolo(bbox_xywh, W, H):
    x, y, w, h = map(float, bbox_xywh)
    # center
    xc = (x + w / 2.0) / float(W)
    yc = (y + h / 2.0) / float(H)
    ww = w / float(W)
    hh = h / float(H)
    # clamp (안전)
    def clamp01(v): 
        return max(0.0, min(1.0, v))
    return clamp01(xc), clamp01(yc), clamp01(ww), clamp01(hh)

# -------------------------
# 3) Build split datasets
# -------------------------
stats = {
    "n_train_images": 0,
    "n_val_images": 0,
    "n_train_labels": 0,
    "n_val_labels": 0,
    "n_train_objects": 0,
    "n_val_objects": 0,
    "n_missing_images": 0,
    "n_skipped_boxes": 0,
    "n_empty_labels": 0,  # 이 대회는 보통 0이 정상
}

def process_split(image_ids, img_dir, lbl_dir, split_name):
    for iid in image_ids:
        im = img_by_id.get(int(iid))
        if im is None:
            continue

        file_name = str(im.get("file_name"))
        W = im.get("width", None)
        H = im.get("height", None)
        if W is None or H is None:
            # width/height 없으면 변환 불가 -> skip
            stats["n_skipped_boxes"] += len(anns_by_img.get(int(iid), []))
            continue

        src_img = train_img_src / Path(file_name).name
        if not src_img.exists():
            stats["n_missing_images"] += 1
            continue

        dst_img = img_dir / Path(file_name).name
        put_image(src_img, dst_img)

        # label file path: 이미지와 동일 stem
        label_path = lbl_dir / (Path(file_name).stem + ".txt")

        lines = []
        for a in anns_by_img.get(int(iid), []):
            cid = a.get("category_id", None)
            bbox = a.get("bbox", None)
            if cid is None or bbox is None or (not isinstance(bbox, list)) or len(bbox) != 4:
                stats["n_skipped_boxes"] += 1
                continue
            cid = int(cid)
            if cid not in id2idx:
                stats["n_skipped_boxes"] += 1
                continue
            cls = id2idx[cid]

            x, y, w, h = map(float, bbox)
            if w <= 0 or h <= 0:
                stats["n_skipped_boxes"] += 1
                continue

            xc, yc, ww, hh = xywh_to_yolo([x, y, w, h], W, H)

            # ww/hh가 0에 너무 가까우면 skip
            if ww <= 0 or hh <= 0:
                stats["n_skipped_boxes"] += 1
                continue

            lines.append(f"{cls} {xc:.6f} {yc:.6f} {ww:.6f} {hh:.6f}")

        if not lines:
            stats["n_empty_labels"] += 1
            label_path.write_text("", encoding="utf-8")
        else:
            label_path.write_text("\n".join(lines) + "\n", encoding="utf-8")

        # stats
        if split_name == "train":
            stats["n_train_images"] += 1
            stats["n_train_labels"] += 1
            stats["n_train_objects"] += len(lines)
        else:
            stats["n_val_images"] += 1
            stats["n_val_labels"] += 1
            stats["n_val_objects"] += len(lines)

process_split(train_ids, IMG_TRAIN_DIR, LBL_TRAIN_DIR, "train")
process_split(val_ids, IMG_VAL_DIR, LBL_VAL_DIR, "val")

# -------------------------
# 4) data.yaml 생성 (Ultralytics)
# -------------------------
data_yaml = DATASET_ROOT / "data.yaml"
yaml_text = []
yaml_text.append(f"path: {DATASET_ROOT.as_posix()}")
yaml_text.append("train: images/train")
yaml_text.append("val: images/val")
yaml_text.append(f"nc: {nc}")
yaml_text.append("names:")
for i, n in enumerate(names):
    # yaml 문자열 안전 처리
    n2 = str(n).replace('"', '\\"')
    yaml_text.append(f"  {i}: \"{n2}\"")

data_yaml.write_text("\n".join(yaml_text) + "\n", encoding="utf-8")

# -------------------------
# 5) Sanity check (라벨 포맷 검사)
# -------------------------
def check_labels(lbl_dir: Path, nc: int, max_report=10):
    bad = []
    for p in sorted(lbl_dir.glob("*.txt")):
        txt = p.read_text(encoding="utf-8").strip()
        if not txt:
            continue
        for ln in txt.splitlines():
            parts = ln.strip().split()
            if len(parts) != 5:
                bad.append((p.name, ln, "len!=5"))
                continue
            try:
                c = int(parts[0])
                vals = list(map(float, parts[1:]))
            except Exception:
                bad.append((p.name, ln, "parse_error"))
                continue
            if not (0 <= c < nc):
                bad.append((p.name, ln, "class_out_of_range"))
                continue
            if any(v < 0 or v > 1 for v in vals):
                bad.append((p.name, ln, "val_out_of_0_1"))
                continue
    return bad[:max_report], len(bad)

bad_train, n_bad_train = check_labels(LBL_TRAIN_DIR, nc)
bad_val, n_bad_val = check_labels(LBL_VAL_DIR, nc)

# -------------------------
# 6) Save conversion manifest
# -------------------------
manifest = {
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "dataset_root": str(DATASET_ROOT),
    "copy_mode": COPY_MODE,
    "merged_coco": str(MERGED_COCO),
    "split_json": str(SPLIT_JSON),
    "label_map": str(LABEL_MAP),
    "stats": stats,
    "sanity": {
        "n_bad_train_lines": n_bad_train,
        "n_bad_val_lines": n_bad_val,
        "bad_train_examples": bad_train,
        "bad_val_examples": bad_val,
    },
}
out_manifest = DATASET_ROOT / "convert_manifest.json"
out_manifest.write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8")

# -------------------------
# 7) Print summary
# -------------------------
print("[YOLO DATASET READY]")
print(f"- dataset_root : {DATASET_ROOT}")
print(f"- data.yaml    : {data_yaml}")
print(f"- nc           : {nc}")
print(f"- copy_mode    : {COPY_MODE}")

print("\n[COUNTS]")
print(f"- train images : {stats['n_train_images']} | labels: {stats['n_train_labels']} | objects: {stats['n_train_objects']}")
print(f"- val images   : {stats['n_val_images']}   | labels: {stats['n_val_labels']}   | objects: {stats['n_val_objects']}")
print(f"- missing imgs : {stats['n_missing_images']}")
print(f"- skipped box  : {stats['n_skipped_boxes']}")
print(f"- empty labels : {stats['n_empty_labels']}")

print("\n[SANITY]")
print(f"- bad train label lines: {n_bad_train}")
if bad_train:
    for ex in bad_train:
        print("  -", ex)
print(f"- bad val label lines  : {n_bad_val}")
if bad_val:
    for ex in bad_val:
        print("  -", ex)

print("\n[SAVED]")
print(f"- {out_manifest}")

[YOLO DATASET READY]
- dataset_root : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\pill_od_yolo_exp_20260202_230604
- data.yaml    : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\pill_od_yolo_exp_20260202_230604\data.yaml
- nc           : 56
- copy_mode    : copy

[COUNTS]
- train images : 185 | labels: 185 | objects: 608
- val images   : 47   | labels: 47   | objects: 154
- missing imgs : 0
- skipped box  : 0
- empty labels : 0

[SANITY]
- bad train label lines: 0
- bad val label lines  : 0

[SAVED]
- C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\pill_od_yolo_exp_20260202_230604\convert_manifest.json


In [13]:
# [Code Cell] 4-1. 베이스라인 학습 계획 고정(ultralytics YOLO) + 환경 점검 + train_plan 저장

from pathlib import Path
import json
from datetime import datetime

# 1) dataset yaml 경로 확보 (3-8에서 만든 DATASET_ROOT/data.yaml 우선)
data_yaml = None
if "DATASET_ROOT" in globals():
    p = Path(globals()["DATASET_ROOT"]) / "data.yaml"
    if p.exists():
        data_yaml = p

# fallback: WORK["DATA"]/datasets 아래에서 현재 RUN_NAME 포함한 폴더 탐색
if data_yaml is None:
    ROOT_ = globals().get("ROOT", Path(".").resolve())
    WORK_ = globals().get("WORK", {"DATA": ROOT_ / "data"})
    RUN_NAME_ = globals().get("RUN_NAME", "")
    ds_root = Path(WORK_["DATA"]) / "datasets"
    cands = sorted(ds_root.glob(f"*{RUN_NAME_}*/data.yaml")) if RUN_NAME_ else sorted(ds_root.rglob("data.yaml"))
    if not cands:
        raise FileNotFoundError(f"data.yaml을 찾지 못했습니다. datasets dir: {ds_root}")
    data_yaml = cands[0]

print(f"[OK] data.yaml: {data_yaml}")

# 2) torch/ultralytics 환경 점검
torch_info = {}
ultra_info = {}
try:
    import torch
    torch_info = {
        "torch": torch.__version__,
        "cuda_available": torch.cuda.is_available(),
        "cuda_version": getattr(torch.version, "cuda", None),
        "device_count": torch.cuda.device_count() if torch.cuda.is_available() else 0,
        "device_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() and torch.cuda.device_count() > 0 else None,
    }
except Exception as e:
    print(f"[WARN] torch import failed: {e}")

try:
    import ultralytics
    ultra_info = {
        "ultralytics": getattr(ultralytics, "__version__", None),
    }
except Exception as e:
    print(f"[WARN] ultralytics import failed: {e}")

print("\n[ENV]")
for k, v in torch_info.items():
    print(f"- {k}: {v}")
for k, v in ultra_info.items():
    print(f"- {k}: {v}")

# 3) data.yaml 로드해서 nc/names 확인
def load_yaml_like(path: Path):
    # pyyaml 없이도 파싱 가능하지만, 있으면 yaml로 읽고 없으면 간단히 문자열만 사용
    try:
        import yaml
        return yaml.safe_load(path.read_text(encoding="utf-8"))
    except Exception:
        return None

data_cfg = load_yaml_like(data_yaml)
if data_cfg:
    nc = int(data_cfg.get("nc", -1))
    names = data_cfg.get("names", None)
    n_names = len(names) if isinstance(names, dict) else (len(names) if isinstance(names, list) else None)
    print("\n[DATA.YAML CHECK]")
    print(f"- nc: {nc}")
    print(f"- names count: {n_names}")
else:
    print("\n[WARN] yaml 파서가 없어서 data.yaml 내용 검증을 생략합니다. (PyYAML 설치 시 자동 검증)")

# 4) 베이스라인 학습 플랜 고정
#   - 데이터가 작으니 일단 yolov8s + imgsz=768 기준으로 시작 (필요하면 m로 올림)
BASELINE = {
    "framework": "ultralytics",
    "task": "detect",
    "model": "yolov8s.pt",
    "data": str(data_yaml),
    "imgsz": 768,
    "epochs": 100,
    "batch": 8,
    "patience": 30,
    "optimizer": "auto",
    "lr0": None,
    "lrf": None,
    "weight_decay": None,
    "workers": 4,
    "device": 0 if torch_info.get("cuda_available") else "cpu",
    "seed": int(globals().get("SEED", 42)),
    "deterministic": True,
    "cache": False,
    "amp": True,          # mixed precision (GPU 있을 때)
    "cos_lr": False,
    "close_mosaic": 10,   # 마지막 몇 epoch에서 mosaic off (박스 정밀도에 도움될 때 많음)
    "save": True,
    "save_period": -1,
    "plots": True,
    "val": True,
}

# 5) CFG 반영 + 저장
if "CFG" in globals():
    CFG["train"]["framework"] = "ultralytics_yolo"
    CFG["train"]["model"]["name"] = "yolov8s"
    CFG["train"]["model"]["imgsz"] = BASELINE["imgsz"]
    CFG["train"]["hyperparams"]["epochs"] = BASELINE["epochs"]
    CFG["train"]["hyperparams"]["batch"] = BASELINE["batch"]
    CFG["train"]["hyperparams"]["workers"] = BASELINE["workers"]
    CFG["infer"]["max_det_per_image"] = 4
    if "save_cfg" in globals():
        save_cfg()

# train plan 저장
DIRS_ = globals().get("DIRS", {})
plan_dir = Path(DIRS_.get("CONFIG", Path(".") / "runs" / "config"))
plan_dir.mkdir(parents=True, exist_ok=True)
train_plan_path = plan_dir / "train_plan_ultralytics_baseline.json"
payload = {
    "created_at": datetime.now().isoformat(timespec="seconds"),
    "run_name": globals().get("RUN_NAME", ""),
    "data_yaml": str(data_yaml),
    "baseline": BASELINE,
    "torch": torch_info,
    "ultralytics": ultra_info,
}
train_plan_path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")

print("\n[BASELINE PLAN]")
for k in ["model", "imgsz", "epochs", "batch", "patience", "device", "close_mosaic", "amp"]:
    print(f"- {k}: {BASELINE[k]}")
print(f"\n[OK] saved train plan -> {train_plan_path}")

[OK] data.yaml: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\pill_od_yolo_exp_20260202_230604\data.yaml

[ENV]
- torch: 2.5.1+cu121
- cuda_available: True
- cuda_version: 12.1
- device_count: 1
- device_name: NVIDIA GeForce RTX 3080
- ultralytics: 8.4.9

[DATA.YAML CHECK]
- nc: 56
- names count: 56

[BASELINE PLAN]
- model: yolov8s.pt
- imgsz: 768
- epochs: 100
- batch: 8
- patience: 30
- device: 0
- close_mosaic: 10
- amp: True

[OK] saved train plan -> C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\config\train_plan_ultralytics_baseline.json


In [14]:
# [Code Cell] 4-2. (Ultralytics) 베이스라인 학습 실행 + best/last 경로 수집 + 결과 기록(results.csv/jsonl)

from pathlib import Path
import json
from datetime import datetime

# --- 필수 경로/설정 로드 ---
DIRS_ = globals().get("DIRS", {})
RUN_DIR = Path(DIRS_["RUN_DIR"])
CKPT_DIR = Path(DIRS_["CKPT"])
LOGS_DIR = Path(DIRS_["LOGS"])
CONFIG_DIR = Path(DIRS_["CONFIG"])

TRAIN_PLAN_PATH = CONFIG_DIR / "train_plan_ultralytics_baseline.json"
if not TRAIN_PLAN_PATH.exists():
    raise FileNotFoundError(f"train plan not found: {TRAIN_PLAN_PATH}")

train_plan = json.loads(TRAIN_PLAN_PATH.read_text(encoding="utf-8"))
BASELINE = train_plan["baseline"]

# data.yaml 경로
DATA_YAML = Path(train_plan["data_yaml"])
if not DATA_YAML.exists():
    raise FileNotFoundError(f"data.yaml not found: {DATA_YAML}")

# --- ultralytics import ---
from ultralytics import YOLO

# --- 학습 결과를 이 RUN 폴더로 강제 저장 ---
# Ultralytics는 project/name으로 runs를 만들기 때문에, project=RUN_DIR, name="ultra_train"로 고정
PROJECT_DIR = RUN_DIR / "ultralytics"
EXP_NAME = "train_baseline"

# 학습 파라미터 구성
train_kwargs = dict(
    data=str(DATA_YAML),
    imgsz=int(BASELINE["imgsz"]),
    epochs=int(BASELINE["epochs"]),
    batch=int(BASELINE["batch"]),
    patience=int(BASELINE["patience"]),
    workers=int(BASELINE["workers"]),
    device=BASELINE["device"],
    seed=int(BASELINE["seed"]),
    deterministic=bool(BASELINE["deterministic"]),
    amp=bool(BASELINE["amp"]),
    close_mosaic=int(BASELINE["close_mosaic"]),
    plots=bool(BASELINE["plots"]),
    val=bool(BASELINE["val"]),
    save=bool(BASELINE["save"]),
    save_period=int(BASELINE["save_period"]),
    cache=bool(BASELINE["cache"]),
    project=str(PROJECT_DIR),
    name=str(EXP_NAME),
)

# 옵션값(None)은 ultralytics에 넘기지 않기(오류 방지)
for k in ["lr0", "lrf", "weight_decay", "optimizer", "cos_lr"]:
    v = BASELINE.get(k, None)
    if v is not None:
        train_kwargs[k] = v

# --- 이벤트 로그 ---
if "log_event" in globals():
    log_event("train_start", {"exp": EXP_NAME, "project": str(PROJECT_DIR), "kwargs": train_kwargs})

print("[TRAIN START]")
print("- project:", PROJECT_DIR)
print("- name   :", EXP_NAME)
print("- data   :", DATA_YAML)
print("- model  :", BASELINE["model"])
print("- imgsz/epochs/batch:", BASELINE["imgsz"], BASELINE["epochs"], BASELINE["batch"])

# --- run ---
model = YOLO(BASELINE["model"])
results = model.train(**train_kwargs)

# --- 결과 디렉터리 탐색 ---
# 보통: PROJECT_DIR/EXP_NAME/
exp_dir = PROJECT_DIR / EXP_NAME
if not exp_dir.exists():
    # 혹시 ultralytics가 name을 자동 변경했을 경우를 대비
    cands = sorted(PROJECT_DIR.glob(f"{EXP_NAME}*"))
    if not cands:
        raise FileNotFoundError(f"ultralytics exp dir not found under {PROJECT_DIR}")
    exp_dir = cands[-1]

weights_dir = exp_dir / "weights"
best_pt = weights_dir / "best.pt"
last_pt = weights_dir / "last.pt"

# best/last를 우리 CKPT_DIR로 복사(또는 overwrite)
CKPT_DIR.mkdir(parents=True, exist_ok=True)

def safe_copy(src: Path, dst: Path):
    if not src.exists():
        return False
    dst.parent.mkdir(parents=True, exist_ok=True)
    dst.write_bytes(src.read_bytes())
    return True

best_dst = CKPT_DIR / "best.pt"
last_dst = CKPT_DIR / "last.pt"
copied_best = safe_copy(best_pt, best_dst)
copied_last = safe_copy(last_pt, last_dst)

# --- metrics 요약 수집 ---
# results는 ultralytics 버전에 따라 구조가 달라서, 파일 기반으로도 수집
metrics = {}
results_csv = exp_dir / "results.csv"
if results_csv.exists():
    # 가장 마지막 epoch row를 읽어서 key metric 추출
    import pandas as pd
    df = pd.read_csv(results_csv)
    if len(df) > 0:
        last = df.iloc[-1].to_dict()
        # 가능한 키들(버전별로 다를 수 있음)
        # map50, map50-95 같은 컬럼명을 우선 시도
        for k in ["metrics/mAP50-95(B)", "metrics/mAP50(B)", "metrics/mAP75(B)",
                  "metrics/mAP50-95", "metrics/mAP50", "metrics/mAP75",
                  "val/box_map", "val/box_map50", "val/box_map75"]:
            if k in last:
                metrics[k] = float(last[k])
        # loss도 남김(있으면)
        for k in ["train/box_loss", "train/cls_loss", "train/dfl_loss",
                  "val/box_loss", "val/cls_loss", "val/dfl_loss"]:
            if k in last:
                metrics[k] = float(last[k])

# 표준화된 키로도 한 번 넣기(있으면)
# - 대회 지표: mAP@[0.75:0.95] => mAP_75_95 로 기록
mAP_75_95 = None
mAP_50 = None
mAP_75 = None
for k, v in metrics.items():
    if "mAP50-95" in k:
        mAP_75_95 = v
    elif "mAP50" in k and "mAP50-95" not in k:
        mAP_50 = v
    elif "mAP75" in k:
        mAP_75 = v

std_metrics = {
    "mAP_75_95": mAP_75_95,
    "mAP_50": mAP_50,
    "mAP_75": mAP_75,
    "exp_dir": str(exp_dir),
    "best_pt": str(best_pt) if best_pt.exists() else None,
    "last_pt": str(last_pt) if last_pt.exists() else None,
    "best_copied_to": str(best_dst) if copied_best else None,
    "last_copied_to": str(last_dst) if copied_last else None,
    "results_csv": str(results_csv) if results_csv.exists() else None,
}

# --- artifacts 저장 ---
train_out = {
    "finished_at": datetime.now().isoformat(timespec="seconds"),
    "exp_dir": str(exp_dir),
    "train_kwargs": train_kwargs,
    "std_metrics": std_metrics,
    "raw_metrics": metrics,
    "weights": {
        "best_pt": str(best_pt) if best_pt.exists() else None,
        "last_pt": str(last_pt) if last_pt.exists() else None,
    },
}

out_path = LOGS_DIR / "train_baseline_out.json"
out_path.write_text(json.dumps(train_out, indent=2, ensure_ascii=False), encoding="utf-8")

print("\n[TRAIN DONE]")
print("- exp_dir :", exp_dir)
print("- best.pt :", best_pt, "| copied ->", best_dst if copied_best else "(missing)")
print("- last.pt :", last_pt, "| copied ->", last_dst if copied_last else "(missing)")
print("- mAP_75_95:", mAP_75_95)
print("- mAP_50   :", mAP_50)
print("- mAP_75   :", mAP_75)
print("- saved    :", out_path)

# --- results table 기록 (2-4에서 만든 record_result 사용) ---
if "record_result" in globals():
    record_result(
        result_name="baseline_yolov8s_img768",
        stage="val",
        metrics={
            "mAP_75_95": mAP_75_95,
            "mAP_50": mAP_50,
            "mAP_75": mAP_75,
            **{f"ultra.{k}": v for k, v in metrics.items()},
        },
        notes="Ultralytics YOLOv8s baseline (imgsz=768, close_mosaic=10, amp=True)",
        submission_path=None,
    )

# --- 이벤트 로그 ---
if "log_event" in globals():
    log_event("train_end", {"exp_dir": str(exp_dir), "std_metrics": std_metrics})

[TRAIN START]
- project: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\ultralytics
- name   : train_baseline
- data   : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\pill_od_yolo_exp_20260202_230604\data.yaml
- model  : yolov8s.pt
- imgsz/epochs/batch: 768 100 8
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8s.pt to 'yolov8s.pt': 100% ━━━━━━━━━━━━ 21.5MB 58.4MB/s 0.4s0.3s<0.1s
Ultralytics 8.4.9  Python-3.11.9 torch-2.5.1+cu121 CUDA:0 (NVIDIA GeForce RTX 3080, 10240MiB)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=True, angle=1.0, augment=False, auto_augment=randaugment, batch=8, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=C:\Users\amy\Desktop\sprint\ \pjt-sprint_ai07_healthcare\experiments\DM2\data\datasets\p

  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_fname, dpi=250)
  fig.savefig(plot_f

                   all         47        154       0.85       0.97      0.968      0.955
             5mg          6          6      0.958          1      0.995      0.995
            100mg          1          1       0.83          1      0.995      0.995
            2mg         32         32      0.966      0.882      0.989      0.989
    ()()          6          6      0.957          1      0.995      0.969
     ()()          2          2      0.888          1      0.995      0.995
                            1          1      0.834          1      0.995      0.995
          ()          4          4      0.934          1      0.995      0.995
          ()          2          2      0.944          1      0.995      0.995
          10mg/          1          1      0.838          1      0.995      0.895
     8 650mg          1          1      0.965          1      0.995      0.995
              20mg          5          5       0.95          1      0.995      0.995
             20mg     

In [15]:
# [Code Cell] 5-1. test 추론(best.pt) + Top-4 후처리 + submission.csv 생성/검증

from pathlib import Path
import json
import csv
from datetime import datetime

# -----------------------------
# 0) Paths / config
# -----------------------------
ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
DIRS_ = globals().get("DIRS", {})
RUN_NAME_ = globals().get("RUN_NAME", "run")
CFG_ = globals().get("CFG", {})

TEST_DIR = Path(INPUT_["TEST_IMAGES"])
if not TEST_DIR.exists():
    raise FileNotFoundError(f"test_images not found: {TEST_DIR}")

CKPT_DIR = Path(DIRS_["CKPT"])
BEST_PT = CKPT_DIR / "best.pt"
if not BEST_PT.exists():
    raise FileNotFoundError(f"best.pt not found: {BEST_PT}")

SUB_DIR = Path(DIRS_["SUBMISSIONS"])
SUB_DIR.mkdir(parents=True, exist_ok=True)

CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
LABEL_MAP = CACHE_DIR / "label_map_full.json"
if not LABEL_MAP.exists():
    cand = list(CACHE_DIR.rglob("label_map_full.json"))
    if cand:
        LABEL_MAP = cand[0]
    else:
        raise FileNotFoundError(f"label_map_full.json not found under {CACHE_DIR}")

# infer settings
CONF_THR = float(CFG_.get("infer", {}).get("conf_thr", 0.001))
NMS_IOU = float(CFG_.get("infer", {}).get("nms_iou_thr", 0.5))
TOPK = int(CFG_.get("postprocess", {}).get("topk", 4))
MAX_DET = max(50, TOPK)  # raw는 넉넉히 받고 TopK로 자르기
IMGSZ = int(CFG_.get("train", {}).get("model", {}).get("imgsz", 768))

# -----------------------------
# 1) label map: class_index -> category_id
# -----------------------------
with open(LABEL_MAP, "r", encoding="utf-8") as f:
    lm = json.load(f)

idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
nc = int(lm.get("num_classes", len(idx2id)))
if len(idx2id) != nc:
    # idx2id가 dict가 아닐 수도 있어서 보정
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}
    nc = len(idx2id)

# -----------------------------
# 2) (optional) whitelist 로드: test 40 클래스만 남기기
# -----------------------------
def _load_whitelist():
    candidates = [
        ROOT_ / "data" / "test_class_whitelist.json",
        ROOT_ / "data" / "test_classes_40.json",
        ROOT_ / "data" / "whitelist_40.txt",
        ROOT_ / "data" / "whitelist.txt",
        CACHE_DIR / "test_class_whitelist.json",
        CACHE_DIR / "test_classes_40.json",
    ]
    for p in candidates:
        if not p.exists():
            continue
        if p.suffix.lower() == ".txt":
            vals = []
            for line in p.read_text(encoding="utf-8").splitlines():
                line = line.strip()
                if line:
                    vals.append(int(line))
            return sorted(set(vals)), str(p)
        if p.suffix.lower() == ".json":
            obj = json.loads(p.read_text(encoding="utf-8"))
            if isinstance(obj, list):
                return sorted(set(int(x) for x in obj)), str(p)
            if isinstance(obj, dict):
                key = "whitelist" if "whitelist" in obj else ("classes" if "classes" in obj else None)
                if key and isinstance(obj[key], list):
                    return sorted(set(int(x) for x in obj[key])), str(p)
    return None, None

WHITELIST_IDS, WHITELIST_PATH = _load_whitelist()
WHITELIST_SET = set(WHITELIST_IDS) if WHITELIST_IDS else None

if WHITELIST_SET:
    print(f"[WHITELIST] loaded {len(WHITELIST_SET)} ids from: {WHITELIST_PATH}")
else:
    print("[WHITELIST] not found -> will submit across all trained classes (56).")

# -----------------------------
# 3) helper: filename -> image_id(int)
# -----------------------------
def parse_image_id(path: Path) -> int:
    s = path.stem
    try:
        return int(s)
    except Exception:
        raise ValueError(f"image file stem must be int for submission. got: {path.name}")

# -----------------------------
# 4) Inference (Ultralytics)
# -----------------------------
from ultralytics import YOLO

model = YOLO(str(BEST_PT))

raw_jsonl = SUB_DIR / f"pred_test_raw_{RUN_NAME_}.jsonl"
sub_csv = SUB_DIR / f"submission_{RUN_NAME_}_best.csv"

# Kaggle 제출 포맷 헤더
header = ["annotation_id", "image_id", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h", "score"]

ann_id = 1
n_images = 0
n_det_raw = 0
n_det_after = 0
n_filtered_whitelist = 0
bad_class_idx = 0

# raw 저장(jsonl) + submission(csv) 동시 생성
with open(raw_jsonl, "w", encoding="utf-8") as f_raw, open(sub_csv, "w", newline="", encoding="utf-8") as f_csv:
    w = csv.writer(f_csv)
    w.writerow(header)

    results_stream = model.predict(
        source=str(TEST_DIR),
        conf=CONF_THR,
        iou=NMS_IOU,
        imgsz=IMGSZ,
        max_det=MAX_DET,
        stream=True,
        verbose=False,
        device=0,  # GPU
    )

    for r in results_stream:
        # r.path: 이미지 경로
        img_path = Path(r.path)
        image_id = parse_image_id(img_path)
        n_images += 1

        # boxes: xyxy + conf + cls
        boxes = r.boxes
        dets = []
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()
            conf = boxes.conf.cpu().numpy()
            cls = boxes.cls.cpu().numpy()

            for (x1, y1, x2, y2), sc, ci in zip(xyxy, conf, cls):
                ci = int(ci)
                if ci not in idx2id:
                    bad_class_idx += 1
                    continue
                cat_id = idx2id[ci]

                # whitelist filter (optional)
                if WHITELIST_SET is not None and cat_id not in WHITELIST_SET:
                    n_filtered_whitelist += 1
                    continue

                x1 = float(x1); y1 = float(y1); x2 = float(x2); y2 = float(y2)
                bw = max(0.0, x2 - x1)
                bh = max(0.0, y2 - y1)

                dets.append({
                    "category_id": int(cat_id),
                    "bbox_x": x1,
                    "bbox_y": y1,
                    "bbox_w": bw,
                    "bbox_h": bh,
                    "score": float(sc),
                })

        n_det_raw += len(dets)

        # Top-K by score (대회는 image당 최대 4개)
        dets = sorted(dets, key=lambda d: d["score"], reverse=True)[:TOPK]
        n_det_after += len(dets)

        # raw jsonl 저장(이미지 단위)
        f_raw.write(json.dumps({
            "image_id": image_id,
            "file_name": img_path.name,
            "n_dets_raw_after_filter": len(dets),
            "dets": dets,
        }, ensure_ascii=False) + "\n")

        # submission rows
        for d in dets:
            w.writerow([
                ann_id,
                image_id,
                d["category_id"],
                int(round(d["bbox_x"])),
                int(round(d["bbox_y"])),
                int(round(d["bbox_w"])),
                int(round(d["bbox_h"])),
                float(d["score"]),
            ])
            ann_id += 1

# -----------------------------
# 5) Submission sanity check
# -----------------------------
# - 각 image_id 당 row <= 4
# - header/컬럼수 정상
# - bbox_w/h >=0
from collections import Counter

per_img = Counter()
n_rows = 0
bad_rows = 0

with open(sub_csv, "r", encoding="utf-8") as f:
    rdr = csv.DictReader(f)
    if rdr.fieldnames != header:
        raise ValueError(f"submission header mismatch.\nexpected={header}\ngot={rdr.fieldnames}")

    for row in rdr:
        n_rows += 1
        iid = int(row["image_id"])
        per_img[iid] += 1

        bw = float(row["bbox_w"])
        bh = float(row["bbox_h"])
        sc = float(row["score"])
        if bw < 0 or bh < 0 or sc < 0:
            bad_rows += 1

too_many = sum(1 for iid, c in per_img.items() if c > TOPK)

print("\n[INFER+SUBMISSION DONE]")
print(f"- test images processed : {n_images}")
print(f"- det count (after whitelist filter, before topk): {n_det_raw}")
print(f"- det count (after topk={TOPK})                : {n_det_after}")
print(f"- rows in submission   : {n_rows}")
print(f"- whitelist filtered   : {n_filtered_whitelist}")
print(f"- bad class idx skipped: {bad_class_idx}")
print(f"- per-image >{TOPK} rows: {too_many}")
print(f"- bad rows(bw/bh/score): {bad_rows}")

print("\n[SAVED]")
print(f"- raw preds jsonl: {raw_jsonl}")
print(f"- submission csv : {sub_csv}")

# results 기록(있으면)
if "record_result" in globals():
    record_result(
        result_name="submission_best_top4",
        stage="kaggle_submit",
        metrics={
            "conf_thr": CONF_THR,
            "nms_iou_thr": NMS_IOU,
            "topk": TOPK,
            "n_images": n_images,
            "n_rows": n_rows,
            "whitelist_used": bool(WHITELIST_SET),
            "whitelist_n": len(WHITELIST_SET) if WHITELIST_SET else 0,
        },
        notes="best.pt inference on test_images -> top4 -> submission csv",
        submission_path=str(sub_csv),
    )


[WHITELIST] not found -> will submit across all trained classes (56).

[INFER+SUBMISSION DONE]
- test images processed : 842
- det count (after whitelist filter, before topk): 8088
- det count (after topk=4)                : 3367
- rows in submission   : 3367
- whitelist filtered   : 0
- bad class idx skipped: 0
- per-image >4 rows: 0
- bad rows(bw/bh/score): 0

[SAVED]
- raw preds jsonl: C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\submissions\pred_test_raw_exp_20260202_230604.jsonl
- submission csv : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\submissions\submission_exp_20260202_230604_best.csv
[OK] recorded -> results.csv | submission_best_top4 (kaggle_submit)
[OK] jsonl    -> results.jsonl
[OK] summary  -> latest_summary.md


In [16]:
# [Code Cell] 5-2. (whitelist 없이) test 예측 캐시 저장 + conf_thr sweep로 여러 submission 생성

from pathlib import Path
import json
import csv
from collections import Counter

ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TRAIN_ANN_DIR": ROOT_ / "train_annotations",
    "TEST_IMAGES": ROOT_ / "test_images",
})
DIRS_ = globals().get("DIRS", {})
RUN_NAME_ = globals().get("RUN_NAME", "run")
CFG_ = globals().get("CFG", {})

TEST_DIR = Path(INPUT_["TEST_IMAGES"])
if not TEST_DIR.exists():
    raise FileNotFoundError(f"test_images not found: {TEST_DIR}")

CKPT_DIR = Path(DIRS_["CKPT"])
BEST_PT = CKPT_DIR / "best.pt"
if not BEST_PT.exists():
    raise FileNotFoundError(f"best.pt not found: {BEST_PT}")

SUB_DIR = Path(DIRS_["SUBMISSIONS"])
SUB_DIR.mkdir(parents=True, exist_ok=True)

CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
LABEL_MAP = CACHE_DIR / "label_map_full.json"
if not LABEL_MAP.exists():
    cand = list(CACHE_DIR.rglob("label_map_full.json"))
    if cand:
        LABEL_MAP = cand[0]
    else:
        raise FileNotFoundError(f"label_map_full.json not found under {CACHE_DIR}")

# infer base settings (NMS/IMGSZ는 캐시 생성에만 영향)
NMS_IOU = float(CFG_.get("infer", {}).get("nms_iou_thr", 0.5))
IMGSZ = int(CFG_.get("train", {}).get("model", {}).get("imgsz", 768))

# 캐시 만들 때는 conf 낮게 + max_det 크게
CACHE_CONF = 0.001
CACHE_MAX_DET = 200

# 이후 sweep에서 쓸 후보 conf들
CONF_CANDIDATES = [0.001, 0.01, 0.02, 0.03, 0.05]

TOPK = int(CFG_.get("postprocess", {}).get("topk", 4))

# -----------------------------
# 1) label map: class_index -> category_id
# -----------------------------
lm = json.loads(LABEL_MAP.read_text(encoding="utf-8"))
idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
if not idx2id:
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}

# -----------------------------
# 2) helpers
# -----------------------------
def parse_image_id(path: Path) -> int:
    try:
        return int(path.stem)
    except Exception:
        raise ValueError(f"image file stem must be int for submission. got: {path.name}")

def write_submission_from_cache(cache_jsonl: Path, conf_thr: float, topk: int, out_csv: Path):
    header = ["annotation_id", "image_id", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h", "score"]

    ann_id = 1
    per_img = Counter()
    n_rows = 0

    with open(cache_jsonl, "r", encoding="utf-8") as f_in, open(out_csv, "w", newline="", encoding="utf-8") as f_out:
        w = csv.writer(f_out)
        w.writerow(header)

        for line in f_in:
            obj = json.loads(line)
            image_id = int(obj["image_id"])
            dets = obj.get("dets", [])

            # conf 필터 + topk
            dets2 = [d for d in dets if float(d["score"]) >= conf_thr]
            dets2 = sorted(dets2, key=lambda d: d["score"], reverse=True)[:topk]

            for d in dets2:
                w.writerow([
                    ann_id,
                    image_id,
                    int(d["category_id"]),
                    int(round(float(d["bbox_x"]))),
                    int(round(float(d["bbox_y"]))),
                    int(round(float(d["bbox_w"]))),
                    int(round(float(d["bbox_h"]))),
                    float(d["score"]),
                ])
                ann_id += 1
                n_rows += 1
                per_img[image_id] += 1

    too_many = sum(1 for _, c in per_img.items() if c > topk)
    return {"rows": n_rows, "too_many": too_many}

# -----------------------------
# 3) 캐시 생성 (한 번만)
# -----------------------------
from ultralytics import YOLO

cache_jsonl = SUB_DIR / f"pred_test_cache_full_{RUN_NAME_}.jsonl"
need_build_cache = True
if cache_jsonl.exists():
    # 이미 만들어둔 캐시가 있으면 재사용
    need_build_cache = False

if need_build_cache:
    print("[CACHE BUILD] start")
    model = YOLO(str(BEST_PT))

    n_images = 0
    n_dets = 0
    bad_class_idx = 0

    with open(cache_jsonl, "w", encoding="utf-8") as f:
        stream = model.predict(
            source=str(TEST_DIR),
            conf=CACHE_CONF,
            iou=NMS_IOU,
            imgsz=IMGSZ,
            max_det=CACHE_MAX_DET,
            stream=True,
            verbose=False,
            device=0,
        )

        for r in stream:
            img_path = Path(r.path)
            image_id = parse_image_id(img_path)
            n_images += 1

            dets = []
            boxes = r.boxes
            if boxes is not None and len(boxes) > 0:
                xyxy = boxes.xyxy.cpu().numpy()
                conf = boxes.conf.cpu().numpy()
                cls = boxes.cls.cpu().numpy()

                for (x1, y1, x2, y2), sc, ci in zip(xyxy, conf, cls):
                    ci = int(ci)
                    if ci not in idx2id:
                        bad_class_idx += 1
                        continue
                    cat_id = idx2id[ci]
                    x1 = float(x1); y1 = float(y1); x2 = float(x2); y2 = float(y2)
                    bw = max(0.0, x2 - x1)
                    bh = max(0.0, y2 - y1)
                    dets.append({
                        "category_id": int(cat_id),
                        "bbox_x": x1,
                        "bbox_y": y1,
                        "bbox_w": bw,
                        "bbox_h": bh,
                        "score": float(sc),
                    })

            # 점수 내림차순 정렬해서 캐시에 저장(offline sweep 효율)
            dets = sorted(dets, key=lambda d: d["score"], reverse=True)
            n_dets += len(dets)

            f.write(json.dumps({
                "image_id": image_id,
                "file_name": img_path.name,
                "n_dets": len(dets),
                "dets": dets,
            }, ensure_ascii=False) + "\n")

    print("[CACHE BUILD] done")
    print(f"- cache_jsonl      : {cache_jsonl}")
    print(f"- test images      : {n_images}")
    print(f"- total dets(saved): {n_dets}")
    print(f"- bad class idx    : {bad_class_idx}")
else:
    print("[CACHE BUILD] skipped (already exists)")
    print(f"- cache_jsonl: {cache_jsonl}")

# -----------------------------
# 4) conf sweep -> submissions
# -----------------------------
print("\n[CONF SWEEP] generate submissions")
made = []
for conf_thr in CONF_CANDIDATES:
    out_csv = SUB_DIR / f"submission_{RUN_NAME_}_best_top{TOPK}_conf{conf_thr:.3f}.csv"
    stat = write_submission_from_cache(cache_jsonl, conf_thr=conf_thr, topk=TOPK, out_csv=out_csv)
    made.append((conf_thr, out_csv, stat["rows"], stat["too_many"]))
    print(f"- conf={conf_thr:.3f} | rows={stat['rows']} | per-image>{TOPK}={stat['too_many']} | {out_csv.name}")

# results 기록(있으면): 가장 기본(conf=0.02)만 남겨도 됨
if "record_result" in globals():
    for conf_thr, out_csv, n_rows, too_many in made:
        record_result(
            result_name=f"submission_best_top{TOPK}_conf{conf_thr:.3f}",
            stage="kaggle_submit",
            metrics={
                "conf_thr": conf_thr,
                "nms_iou_thr": NMS_IOU,
                "topk": TOPK,
                "rows": n_rows,
                "per_image_too_many": too_many,
                "cache_conf": CACHE_CONF,
                "cache_max_det": CACHE_MAX_DET,
            },
            notes="offline conf sweep from cached test predictions (no whitelist)",
            submission_path=str(out_csv),
        )

print("\n[DONE] 이제 위에서 생성된 submission_* 파일 중 하나를 골라 Kaggle에 제출하면 됩니다.")

[CACHE BUILD] start
[CACHE BUILD] done
- cache_jsonl      : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\submissions\pred_test_cache_full_exp_20260202_230604.jsonl
- test images      : 842
- total dets(saved): 8088
- bad class idx    : 0

[CONF SWEEP] generate submissions
- conf=0.001 | rows=3367 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.001.csv
- conf=0.010 | rows=3254 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.010.csv
- conf=0.020 | rows=3247 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020.csv
- conf=0.030 | rows=3244 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.030.csv
- conf=0.050 | rows=3235 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.050.csv
[OK] recorded -> results.csv | submission_best_top4_conf0.001 (kaggle_submit)
[OK] jsonl    -> results.jsonl
[OK] summary  -> latest_summary.md
[OK] recorded -> results.csv 

In [17]:
# [Code Cell] 5-2b. submission 파일 diff (conf 0.020 vs 0.030)

from pathlib import Path
import pandas as pd

SUB_DIR = Path(DIRS_["SUBMISSIONS"])
f020 = SUB_DIR / "submission_exp_20260202_230604_best_top4_conf0.020.csv"
f030 = SUB_DIR / "submission_exp_20260202_230604_best_top4_conf0.030.csv"

assert f020.exists(), f"not found: {f020}"
assert f030.exists(), f"not found: {f030}"

def load_norm(p: Path):
    df = pd.read_csv(p)
    # 비교 안정화를 위해 정렬 + 타입 통일
    df = df.sort_values(["image_id", "score", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h"], ascending=[True, False, True, True, True, True, True]).reset_index(drop=True)
    # annotation_id는 의미 없으니 제거
    df = df.drop(columns=["annotation_id"])
    return df

a = load_norm(f020)
b = load_norm(f030)

print("[FILES]")
print("- 0.020:", f020.name, "rows=", len(a))
print("- 0.030:", f030.name, "rows=", len(b))

# 전체가 같은지
same = a.equals(b)
print("\n[EXACT SAME?]", same)

# 다르면 어디가 다른지(이미지 단위)
if not same:
    # image_id별로 문자열로 묶어서 비교(간단/확실)
    def pack(df):
        g = df.groupby("image_id").apply(lambda x: "|".join(x.astype(str).agg(",".join, axis=1))).to_dict()
        return g
    pa = pack(a)
    pb = pack(b)

    all_ids = sorted(set(pa.keys()) | set(pb.keys()))
    diff_ids = [iid for iid in all_ids if pa.get(iid, "") != pb.get(iid, "")]
    print("\n[DIFF SUMMARY]")
    print("- diff image_ids:", len(diff_ids), "/", len(all_ids))
    print("- first 20 diff ids:", diff_ids[:20])

    # 몇 개 예시 출력
    for iid in diff_ids[:5]:
        print(f"\n--- image_id={iid} ---")
        print("[0.020]")
        print(a[a["image_id"] == iid].to_string(index=False))
        print("[0.030]")
        print(b[b["image_id"] == iid].to_string(index=False))

[FILES]
- 0.020: submission_exp_20260202_230604_best_top4_conf0.020.csv rows= 3247
- 0.030: submission_exp_20260202_230604_best_top4_conf0.030.csv rows= 3244

[EXACT SAME?] False

[DIFF SUMMARY]
- diff image_ids: 3 / 842
- first 20 diff ids: [238, 857, 1421]

--- image_id=238 ---
[0.020]
 image_id  category_id  bbox_x  bbox_y  bbox_w  bbox_h    score
      238        16262     626     240     244     235 0.991336
      238         3351     403     834     190     190 0.918899
      238        18357      68     262     302     297 0.529229
      238        33880      66     259     303     300 0.021516
[0.030]
 image_id  category_id  bbox_x  bbox_y  bbox_w  bbox_h    score
      238        16262     626     240     244     235 0.991336
      238         3351     403     834     190     190 0.918899
      238        18357      68     262     302     297 0.529229

--- image_id=857 ---
[0.020]
 image_id  category_id  bbox_x  bbox_y  bbox_w  bbox_h    score
      857        31863     576   

In [18]:
# [Code Cell] 5-3. NMS IoU / imgsz sweep (best.pt) -> 여러 submission 생성

from pathlib import Path
import json, csv
from collections import Counter
from ultralytics import YOLO

ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TEST_IMAGES": ROOT_ / "test_images",
})
DIRS_ = globals().get("DIRS", {})
RUN_NAME_ = globals().get("RUN_NAME", "run")

TEST_DIR = Path(INPUT_["TEST_IMAGES"])
SUB_DIR = Path(DIRS_["SUBMISSIONS"])
SUB_DIR.mkdir(parents=True, exist_ok=True)

BEST_PT = Path(DIRS_["CKPT"]) / "best.pt"
CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
LABEL_MAP = CACHE_DIR / "label_map_full.json"

lm = json.loads(LABEL_MAP.read_text(encoding="utf-8"))
idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
if not idx2id:
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}

def parse_image_id(p: Path) -> int:
    return int(p.stem)

TOPK = 4
CONF = 0.02          # conf는 고정 (이미 검증한 값)
MAX_DET = 200        # 충분히 크게 받고 TopK로 자름

# sweep 후보 (가성비 조합)
IOU_LIST = [0.4, 0.5, 0.6]
IMGSZ_LIST = [768, 896, 1024]

model = YOLO(str(BEST_PT))

header = ["annotation_id", "image_id", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h", "score"]

def run_one(iou, imgsz):
    out_csv = SUB_DIR / f"submission_{RUN_NAME_}_best_top4_conf{CONF:.3f}_iou{iou:.2f}_img{imgsz}.csv"

    ann_id = 1
    per_img = Counter()
    n_images = 0
    n_rows = 0

    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(header)

        stream = model.predict(
            source=str(TEST_DIR),
            conf=CONF,
            iou=float(iou),
            imgsz=int(imgsz),
            max_det=MAX_DET,
            stream=True,
            verbose=False,
            device=0,
        )

        for r in stream:
            img_path = Path(r.path)
            image_id = parse_image_id(img_path)
            n_images += 1

            dets = []
            boxes = r.boxes
            if boxes is not None and len(boxes) > 0:
                xyxy = boxes.xyxy.cpu().numpy()
                confs = boxes.conf.cpu().numpy()
                clss = boxes.cls.cpu().numpy()
                for (x1, y1, x2, y2), sc, ci in zip(xyxy, confs, clss):
                    ci = int(ci)
                    cat_id = idx2id.get(ci, None)
                    if cat_id is None:
                        continue
                    bw = max(0.0, float(x2) - float(x1))
                    bh = max(0.0, float(y2) - float(y1))
                    dets.append((float(sc), int(cat_id), float(x1), float(y1), bw, bh))

            dets.sort(reverse=True, key=lambda x: x[0])
            dets = dets[:TOPK]

            for sc, cat_id, x, y, bw, bh in dets:
                w.writerow([ann_id, image_id, cat_id, int(round(x)), int(round(y)), int(round(bw)), int(round(bh)), float(sc)])
                ann_id += 1
                n_rows += 1
                per_img[image_id] += 1

    too_many = sum(1 for _, c in per_img.items() if c > TOPK)
    print(f"- iou={iou:.2f} imgsz={imgsz} | rows={n_rows} | per-image>4={too_many} | {out_csv.name}")
    return out_csv

print("[SWEEP START] (conf fixed at 0.02)")
made = []
for iou in IOU_LIST:
    for imgsz in IMGSZ_LIST:
        made.append(run_one(iou, imgsz))

print("\n[DONE] 위에 생성된 submission_*.csv 중에서 하나씩 제출해보고 점수 비교하면 됩니다.")

[SWEEP START] (conf fixed at 0.02)
- iou=0.40 imgsz=768 | rows=3247 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.40_img768.csv
- iou=0.40 imgsz=896 | rows=3242 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.40_img896.csv
- iou=0.40 imgsz=1024 | rows=3248 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.40_img1024.csv
- iou=0.50 imgsz=768 | rows=3247 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.50_img768.csv
- iou=0.50 imgsz=896 | rows=3242 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.50_img896.csv
- iou=0.50 imgsz=1024 | rows=3248 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.50_img1024.csv
- iou=0.60 imgsz=768 | rows=3247 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.60_img768.csv
- iou=0.60 imgsz=896 | rows=3242 | per-image>4=0 | submission_exp_20260202_230604_best_top4_conf0.020_iou0.60_img8

In [19]:
# [Code Cell] 5-4. best.pt TTA(augment=True) inference -> top4 -> submission 생성

from pathlib import Path
import json
import csv
from collections import Counter

ROOT_ = globals().get("ROOT", Path(".").resolve())
INPUT_ = globals().get("INPUT", {
    "TEST_IMAGES": ROOT_ / "test_images",
})
DIRS_ = globals().get("DIRS", {})
RUN_NAME_ = globals().get("RUN_NAME", "run")
CFG_ = globals().get("CFG", {})

TEST_DIR = Path(INPUT_["TEST_IMAGES"])
SUB_DIR = Path(DIRS_["SUBMISSIONS"])
SUB_DIR.mkdir(parents=True, exist_ok=True)

BEST_PT = Path(DIRS_["CKPT"]) / "best.pt"
if not BEST_PT.exists():
    raise FileNotFoundError(f"best.pt not found: {BEST_PT}")

CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))
LABEL_MAP = CACHE_DIR / "label_map_full.json"
if not LABEL_MAP.exists():
    cand = list(CACHE_DIR.rglob("label_map_full.json"))
    if cand:
        LABEL_MAP = cand[0]
    else:
        raise FileNotFoundError(f"label_map_full.json not found under {CACHE_DIR}")

lm = json.loads(LABEL_MAP.read_text(encoding="utf-8"))
idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
if not idx2id:
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}

def parse_image_id(p: Path) -> int:
    return int(p.stem)

# --- 설정 (baseline 유지 + TTA만 추가) ---
CONF = 0.02
IOU = 0.50
TOPK = 4
MAX_DET = 200
IMGSZ = 1024  # 768로도 하나 더 만들고 싶으면 여기만 바꾸면 됨

out_csv = SUB_DIR / f"submission_{RUN_NAME_}_best_top{TOPK}_conf{CONF:.3f}_iou{IOU:.2f}_img{IMGSZ}_tta.csv"
header = ["annotation_id", "image_id", "category_id", "bbox_x", "bbox_y", "bbox_w", "bbox_h", "score"]

from ultralytics import YOLO
model = YOLO(str(BEST_PT))

ann_id = 1
n_images = 0
n_rows = 0
per_img = Counter()

with open(out_csv, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(header)

    stream = model.predict(
        source=str(TEST_DIR),
        conf=CONF,
        iou=IOU,
        imgsz=IMGSZ,
        max_det=MAX_DET,
        augment=True,     # <-- TTA 핵심
        stream=True,
        verbose=False,
        device=0,
    )

    for r in stream:
        img_path = Path(r.path)
        image_id = parse_image_id(img_path)
        n_images += 1

        dets = []
        boxes = r.boxes
        if boxes is not None and len(boxes) > 0:
            xyxy = boxes.xyxy.cpu().numpy()
            confs = boxes.conf.cpu().numpy()
            clss = boxes.cls.cpu().numpy()

            for (x1, y1, x2, y2), sc, ci in zip(xyxy, confs, clss):
                ci = int(ci)
                cat_id = idx2id.get(ci, None)
                if cat_id is None:
                    continue
                bw = max(0.0, float(x2) - float(x1))
                bh = max(0.0, float(y2) - float(y1))
                dets.append((float(sc), int(cat_id), float(x1), float(y1), bw, bh))

        dets.sort(reverse=True, key=lambda x: x[0])
        dets = dets[:TOPK]

        for sc, cat_id, x, y, bw, bh in dets:
            w.writerow([ann_id, image_id, cat_id,
                        int(round(x)), int(round(y)), int(round(bw)), int(round(bh)), float(sc)])
            ann_id += 1
            n_rows += 1
            per_img[image_id] += 1

too_many = sum(1 for _, c in per_img.items() if c > TOPK)

print("[TTA SUBMISSION READY]")
print(f"- images processed : {n_images}")
print(f"- rows            : {n_rows}")
print(f"- per-image >{TOPK}: {too_many}")
print(f"- saved           : {out_csv}")

if "record_result" in globals():
    record_result(
        result_name=f"submission_tta_img{IMGSZ}",
        stage="kaggle_submit",
        metrics={"conf_thr": CONF, "nms_iou_thr": IOU, "topk": TOPK, "imgsz": IMGSZ, "tta": True, "rows": n_rows},
        notes="best.pt inference with augment=True (TTA)",
        submission_path=str(out_csv),
    )

[TTA SUBMISSION READY]
- images processed : 842
- rows            : 3252
- per-image >4: 0
- saved           : C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\artifacts\exp_20260202_230604\submissions\submission_exp_20260202_230604_best_top4_conf0.020_iou0.50_img1024_tta.csv
[OK] recorded -> results.csv | submission_tta_img1024 (kaggle_submit)
[OK] jsonl    -> results.jsonl
[OK] summary  -> latest_summary.md


In [20]:
# [Code Cell] 5-6. exp_20260202_230604(best.pt) 예측 시각화(첨부 스타일) + 파일 저장

from pathlib import Path
import json
import math

# --- 기본 경로 (이전 셀들에서 생성된 DIRS/INPUT/RUN_NAME/CACHE_DIR 사용) ---
ROOT_ = globals().get("ROOT", Path(".").resolve())
DIRS_ = globals().get("DIRS", {})
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TEST_IMAGES": ROOT_ / "test_images",
})
RUN_NAME_ = globals().get("RUN_NAME", "run")
CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))

RUN_DIR = Path(DIRS_["RUN_DIR"])
CKPT_BEST = Path(DIRS_["CKPT"]) / "best.pt"
SUB_DIR = Path(DIRS_["SUBMISSIONS"])

TRAIN_DIR = Path(INPUT_["TRAIN_IMAGES"])
TEST_DIR = Path(INPUT_["TEST_IMAGES"])

assert CKPT_BEST.exists(), f"best.pt not found: {CKPT_BEST}"

# --- 시각화 저장 폴더 ---
VIS_DIR = RUN_DIR / "vis" / "preds_best"
VIS_DIR.mkdir(parents=True, exist_ok=True)

# --- 라벨 매핑 로드: class_index -> category_id, category_id -> name ---
label_map_path = CACHE_DIR / "label_map_full.json"
if not label_map_path.exists():
    cands = list(CACHE_DIR.rglob("label_map_full.json"))
    if cands:
        label_map_path = cands[0]
    else:
        raise FileNotFoundError(f"label_map_full.json not found under {CACHE_DIR}")

lm = json.loads(label_map_path.read_text(encoding="utf-8"))
idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
if not idx2id:
    # fallback
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}

cat_name_path = CACHE_DIR / "category_id_to_name.json"
if not cat_name_path.exists():
    cands = list(CACHE_DIR.rglob("category_id_to_name.json"))
    if cands:
        cat_name_path = cands[0]
    else:
        # 마지막 fallback: reports쪽 csv라도 있으면 거기서 만들 수 있는데, 일단 없으면 id만 표시
        cat_name_path = None

catid2name = {}
if cat_name_path:
    obj = json.loads(cat_name_path.read_text(encoding="utf-8"))
    # 저장 포맷이 dict(cat_id->name)라고 가정
    catid2name = {int(k): str(v) for k, v in obj.items()}

def cat_name(cat_id: int) -> str:
    return catid2name.get(int(cat_id), f"class_{cat_id}")

# --- (선택) 캐시(jsonl)에서 dets 읽기: 있으면 빠르게 시각화 가능 ---
def find_pred_cache_jsonl():
    # 우선순위: raw(top4) -> cache(full)
    patterns = [
        f"pred_test_raw_{RUN_NAME_}.jsonl",
        f"pred_test_cache_full_{RUN_NAME_}.jsonl",
        f"pred_test_raw_*{RUN_NAME_}*.jsonl",
        f"pred_test_cache_full_*{RUN_NAME_}*.jsonl",
    ]
    for pat in patterns:
        cands = sorted(SUB_DIR.glob(pat))
        if cands:
            return cands[-1]
    # 폴더에 없으면 runs/artifacts 전체에서 마지막 하나라도
    cands = sorted(RUN_DIR.rglob(f"pred_test_*{RUN_NAME_}*.jsonl"))
    return cands[-1] if cands else None

PRED_CACHE_JSONL = find_pred_cache_jsonl()

cache_map = None
if PRED_CACHE_JSONL and PRED_CACHE_JSONL.exists():
    cache_map = {}
    with open(PRED_CACHE_JSONL, "r", encoding="utf-8") as f:
        for line in f:
            o = json.loads(line)
            cache_map[int(o["image_id"])] = o.get("dets", [])
    print(f"[OK] loaded pred cache: {PRED_CACHE_JSONL.name} (images={len(cache_map)})")
else:
    print("[INFO] pred cache jsonl not found -> will run inference for selected images.")

# --- 시각화 대상 이미지 id (원하는 대로 수정) ---
IMAGE_IDS = [1143, 1265, 627]  # <- 여기 원하는 image_id로 바꿔도 됨

# --- 추론 설정(첨부 이미지처럼 top4) ---
CONF = 0.02
IOU = 0.50
IMGSZ = 768
TOPK = 4
MAX_DET = 200

# --- 이미지 경로 찾기(우선 test_images, 없으면 train_images) ---
def find_image_path(image_id: int) -> Path:
    p_test = TEST_DIR / f"{image_id}.png"
    if p_test.exists():
        return p_test
    p_train = TRAIN_DIR / f"{image_id}.png"
    if p_train.exists():
        return p_train
    raise FileNotFoundError(f"{image_id}.png not found in test/train dirs.")

# --- dets 확보: cache 있으면 cache 사용, 없으면 모델로 해당 이미지들만 추론 ---
def get_dets_for_ids(image_ids):
    dets_by_id = {}
    missing = []
    for iid in image_ids:
        if cache_map is not None and iid in cache_map:
            # cache det format: {category_id,bbox_x,bbox_y,bbox_w,bbox_h,score}
            dets = sorted(cache_map[iid], key=lambda d: float(d["score"]), reverse=True)[:TOPK]
            dets_by_id[iid] = dets
        else:
            missing.append(iid)

    if missing:
        from ultralytics import YOLO
        model = YOLO(str(CKPT_BEST))
        paths = [str(find_image_path(i)) for i in missing]

        # stream=False로 한 번에 받아서 iid 매칭
        results = model.predict(
            source=paths,
            conf=CONF,
            iou=IOU,
            imgsz=IMGSZ,
            max_det=MAX_DET,
            verbose=False,
            device=0,
        )

        for r in results:
            img_path = Path(r.path)
            iid = int(img_path.stem)
            dets = []
            boxes = r.boxes
            if boxes is not None and len(boxes) > 0:
                xyxy = boxes.xyxy.cpu().numpy()
                confs = boxes.conf.cpu().numpy()
                clss = boxes.cls.cpu().numpy()
                for (x1, y1, x2, y2), sc, ci in zip(xyxy, confs, clss):
                    ci = int(ci)
                    cat_id = idx2id.get(ci, None)
                    if cat_id is None:
                        continue
                    bw = max(0.0, float(x2) - float(x1))
                    bh = max(0.0, float(y2) - float(y1))
                    dets.append({
                        "category_id": int(cat_id),
                        "bbox_x": float(x1),
                        "bbox_y": float(y1),
                        "bbox_w": float(bw),
                        "bbox_h": float(bh),
                        "score": float(sc),
                    })
            dets = sorted(dets, key=lambda d: float(d["score"]), reverse=True)[:TOPK]
            dets_by_id[iid] = dets

    return dets_by_id

dets_by_id = get_dets_for_ids(IMAGE_IDS)

# --- 그리기 ---
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt

# 폰트(윈도우면 맑은고딕 시도, 없으면 기본)
def load_font(size=18):
    candidates = [
        "C:/Windows/Fonts/malgun.ttf",
        "C:/Windows/Fonts/malgunbd.ttf",
        "/usr/share/fonts/truetype/nanum/NanumGothic.ttf",
    ]
    for fp in candidates:
        try:
            if Path(fp).exists():
                return ImageFont.truetype(fp, size=size)
        except Exception:
            pass
    return ImageFont.load_default()

font = load_font(18)

def draw_one(image_path: Path, dets, out_path: Path):
    im = Image.open(image_path).convert("RGB")
    W, H = im.size
    draw = ImageDraw.Draw(im)

    # title 느낌(첨부 이미지처럼 위에 정보)
    title = f"image_id={image_path.stem} | {image_path.name} | {W}x{H}"

    # 박스/라벨 스타일
    box_color = (255, 0, 0)
    text_color = (255, 255, 255)
    fill_color = (255, 0, 0)

    # 상단 타이틀 영역(깔끔하게)
    pad = 6
    tw, th = draw.textbbox((0, 0), title, font=font)[2:]
    draw.rectangle([0, 0, tw + pad*2, th + pad*2], fill=(255, 255, 255))
    draw.text((pad, pad), title, fill=(0, 0, 0), font=font)

    for d in dets:
        x = float(d["bbox_x"])
        y = float(d["bbox_y"])
        w = float(d["bbox_w"])
        h = float(d["bbox_h"])
        sc = float(d["score"])
        cid = int(d["category_id"])
        name = cat_name(cid)

        x1, y1 = int(round(x)), int(round(y))
        x2, y2 = int(round(x + w)), int(round(y + h))

        # box
        draw.rectangle([x1, y1, x2, y2], outline=box_color, width=3)

        label = f"{name} ({cid}) {sc:.3f}"
        lb = draw.textbbox((0, 0), label, font=font)
        lw, lh = lb[2]-lb[0], lb[3]-lb[1]

        # label 위치: 박스 위쪽(넘치면 안쪽)
        tx = x1
        ty = y1 - (lh + 6)
        if ty < 0:
            ty = y1 + 2

        # label background
        draw.rectangle([tx, ty, tx + lw + 10, ty + lh + 6], fill=fill_color)
        draw.text((tx + 5, ty + 3), label, fill=text_color, font=font)

    im.save(out_path)
    return im

# 표시(그리드)
cols = 2
rows = math.ceil(len(IMAGE_IDS) / cols)
plt.figure(figsize=(cols * 7, rows * 7))

for i, iid in enumerate(IMAGE_IDS, 1):
    img_path = find_image_path(iid)
    dets = dets_by_id.get(iid, [])
    out_path = VIS_DIR / f"vis_{iid}.png"
    vis_im = draw_one(img_path, dets, out_path)

    ax = plt.subplot(rows, cols, i)
    ax.imshow(vis_im)
    ax.axis("off")
    ax.set_title(f"saved: {out_path.name}", fontsize=10)

plt.tight_layout()
print(f"[SAVED] visualizations -> {VIS_DIR}")

[OK] loaded pred cache: pred_test_raw_exp_20260202_230604.jsonl (images=842)
[SAVED] visualizations -> C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\vis\preds_best


In [24]:
# [Code Cell] 5-7. Val 실패 케이스(오검출/미검출) Top-N 자동 샘플링 + 시각화 저장

from pathlib import Path
import json, math
from collections import defaultdict
import numpy as np

# -----------------------------
# 0) Paths / load basics
# -----------------------------
ROOT_ = globals().get("ROOT", Path(".").resolve())
DIRS_ = globals().get("DIRS", {})
INPUT_ = globals().get("INPUT", {
    "TRAIN_IMAGES": ROOT_ / "train_images",
    "TEST_IMAGES": ROOT_ / "test_images",
})
CACHE_DIR = Path(globals().get("CACHE_DIR", DIRS_.get("CACHE", ROOT_ / "data" / "cache" / "merged")))

RUN_DIR = Path(DIRS_["RUN_DIR"])
CKPT_BEST = Path(DIRS_["CKPT"]) / "best.pt"
TRAIN_DIR = Path(INPUT_["TRAIN_IMAGES"])

assert CKPT_BEST.exists(), f"best.pt not found: {CKPT_BEST}"
assert TRAIN_DIR.exists(), f"train_images not found: {TRAIN_DIR}"

# merged coco / split / label map
MERGED_COCO = CACHE_DIR / "train_merged_coco.json"
SPLIT_JSON  = CACHE_DIR / "splits" / "split_train_valid.json"
LABEL_MAP   = CACHE_DIR / "label_map_full.json"
CATID2NAME  = CACHE_DIR / "category_id_to_name.json"

if not MERGED_COCO.exists():
    cands = list(CACHE_DIR.rglob("train_merged_coco.json"))
    if cands: MERGED_COCO = cands[0]
    else: raise FileNotFoundError(f"train_merged_coco.json not found under {CACHE_DIR}")

if not SPLIT_JSON.exists():
    cands = list(CACHE_DIR.rglob("split_train_valid.json"))
    if cands: SPLIT_JSON = cands[0]
    else: raise FileNotFoundError(f"split_train_valid.json not found under {CACHE_DIR}")

if not LABEL_MAP.exists():
    cands = list(CACHE_DIR.rglob("label_map_full.json"))
    if cands: LABEL_MAP = cands[0]
    else: raise FileNotFoundError(f"label_map_full.json not found under {CACHE_DIR}")

catid2name = {}
if CATID2NAME.exists():
    catid2name = {int(k): str(v) for k, v in json.loads(CATID2NAME.read_text(encoding="utf-8")).items()}
def cname(cid: int) -> str:
    return catid2name.get(int(cid), f"class_{cid}")

# -----------------------------
# 1) Hyperparams (tweakable)
# -----------------------------
# 실패 판정용 매칭 IoU (보통 0.5로 시작)
IOU_MATCH = 0.50

# 모델 추론 파라미터 (baseline과 동일 추천)
CONF = 0.02
NMS_IOU = 0.50
IMGSZ = 768
MAX_DET = 200

# Top-N 샘플링 개수
TOP_N_FP = 12
TOP_N_FN = 12

# 시각화 저장 폴더
VIS_DIR = RUN_DIR / "vis" / "failcases_val"
VIS_DIR.mkdir(parents=True, exist_ok=True)

print("[CONFIG]")
print(f"- IOU_MATCH={IOU_MATCH} | CONF={CONF} | NMS_IOU={NMS_IOU} | IMGSZ={IMGSZ} | MAX_DET={MAX_DET}")
print(f"- TOP_N_FP={TOP_N_FP} | TOP_N_FN={TOP_N_FN}")
print(f"- VIS_DIR={VIS_DIR}")

# -----------------------------
# 2) Load COCO + split (val ids)
# -----------------------------
coco = json.loads(MERGED_COCO.read_text(encoding="utf-8"))
split = json.loads(SPLIT_JSON.read_text(encoding="utf-8"))
val_ids = [int(x) for x in split.get("valid_image_ids", [])]
val_set = set(val_ids)

images = coco.get("images", [])
anns = coco.get("annotations", [])

img_by_id = {int(im["id"]): im for im in images if isinstance(im, dict) and im.get("id") is not None}

gt_by_img = defaultdict(list)
for a in anns:
    if not isinstance(a, dict): 
        continue
    iid = a.get("image_id", None)
    bbox = a.get("bbox", None)
    cid = a.get("category_id", None)
    if iid is None or bbox is None or cid is None:
        continue
    if not isinstance(bbox, list) or len(bbox) != 4:
        continue
    iid = int(iid)
    if iid not in val_set:
        continue
    x, y, w, h = map(float, bbox)
    if w <= 0 or h <= 0:
        continue
    gt_by_img[iid].append({
        "category_id": int(cid),
        "bbox_x": x,
        "bbox_y": y,
        "bbox_w": w,
        "bbox_h": h
    })

# -----------------------------
# 3) Label map (yolo cls idx -> category_id)
# -----------------------------
lm = json.loads(LABEL_MAP.read_text(encoding="utf-8"))
idx2id = {int(k): int(v) for k, v in lm.get("idx2id", {}).items()}
if not idx2id:
    idx2id = {int(i): int(cid) for i, cid in enumerate(lm.get("category_ids", []))}

# -----------------------------
# 4) Inference on val images
# -----------------------------

from pathlib import Path
from collections import defaultdict

# coco images에서 file_name -> image_id 매핑 생성 (val에 해당하는 것만)
fname2id = {}
for _iid in val_ids:
    im = img_by_id.get(int(_iid), None)
    if im is None:
        continue
    fn = Path(str(im["file_name"])).name
    fname2id[fn] = int(_iid)

# Inference 결과를 pred_by_img에 적재 (iid 추출 방식을 file_name 기반으로 변경)
pred_by_img = defaultdict(list)

for r in results:
    f = Path(r.path).name  # 예: K-003351-..._200.png
    iid = fname2id.get(f, None)

    # 혹시라도 매핑이 안 되면(예외 케이스) 숫자 stem 시도
    if iid is None:
        try:
            iid = int(Path(r.path).stem)
        except Exception:
            # 그래도 안 되면 skip
            continue

    boxes = r.boxes
    if boxes is None or len(boxes) == 0:
        continue

    xyxy = boxes.xyxy.cpu().numpy()
    confs = boxes.conf.cpu().numpy()
    clss  = boxes.cls.cpu().numpy()

    for (x1, y1, x2, y2), sc, ci in zip(xyxy, confs, clss):
        ci = int(ci)
        cat_id = idx2id.get(ci, None)
        if cat_id is None:
            continue

        x1 = float(x1); y1 = float(y1); x2 = float(x2); y2 = float(y2)
        w = max(0.0, x2 - x1)
        h = max(0.0, y2 - y1)

        pred_by_img[iid].append({
            "category_id": int(cat_id),
            "bbox_x": x1,
            "bbox_y": y1,
            "bbox_w": w,
            "bbox_h": h,
            "score": float(sc),
        })

print(f"[OK] pred_by_img filled: {len(pred_by_img)} val images")


# -----------------------------
# 5) Matching (class-aware greedy)
# -----------------------------
def to_xyxy(b):
    x1 = float(b["bbox_x"])
    y1 = float(b["bbox_y"])
    x2 = x1 + float(b["bbox_w"])
    y2 = y1 + float(b["bbox_h"])
    return x1, y1, x2, y2

def iou_xyxy(a, b):
    ax1, ay1, ax2, ay2 = a
    bx1, by1, bx2, by2 = b
    ix1 = max(ax1, bx1)
    iy1 = max(ay1, by1)
    ix2 = min(ax2, bx2)
    iy2 = min(ay2, by2)
    iw = max(0.0, ix2 - ix1)
    ih = max(0.0, iy2 - iy1)
    inter = iw * ih
    area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
    area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
    union = area_a + area_b - inter + 1e-9
    return inter / union

fail_stats = {}  # image_id -> dict
fp_examples = []
fn_examples = []

for iid in val_ids:
    gts = gt_by_img.get(iid, [])
    preds = pred_by_img.get(iid, [])

    # sort preds by score desc
    preds = sorted(preds, key=lambda d: d["score"], reverse=True)

    gt_used = [False] * len(gts)
    pred_used = [False] * len(preds)

    tp_pairs = []  # (pred_idx, gt_idx, iou)

    for pi, p in enumerate(preds):
        best_j = -1
        best_iou = -1.0
        p_xyxy = to_xyxy(p)
        for gi, g in enumerate(gts):
            if gt_used[gi]:
                continue
            if int(g["category_id"]) != int(p["category_id"]):
                continue
            iou = iou_xyxy(p_xyxy, to_xyxy(g))
            if iou > best_iou:
                best_iou = iou
                best_j = gi
        if best_j >= 0 and best_iou >= IOU_MATCH:
            gt_used[best_j] = True
            pred_used[pi] = True
            tp_pairs.append((pi, best_j, best_iou))

    fps = [preds[i] for i, u in enumerate(pred_used) if not u]
    fns = [gts[i] for i, u in enumerate(gt_used) if not u]

    # fp/fn scoring for ranking
    max_fp_score = max([d["score"] for d in fps], default=0.0)
    fn_area_sum = sum([float(d["bbox_w"]) * float(d["bbox_h"]) for d in fns]) if fns else 0.0

    stat = {
        "image_id": iid,
        "n_gt": len(gts),
        "n_pred": len(preds),
        "n_tp": len(tp_pairs),
        "n_fp": len(fps),
        "n_fn": len(fns),
        "max_fp_score": float(max_fp_score),
        "fn_area_sum": float(fn_area_sum),
        "tp_pairs": tp_pairs,
        "fps": fps,
        "fns": fns,
        "gts": gts,
        "preds": preds,
    }
    fail_stats[iid] = stat

# -----------------------------
# 6) Top-N sampling
# -----------------------------
# FP Top: fp 개수 우선, 그 다음 max fp score
fp_rank = sorted(
    fail_stats.values(),
    key=lambda s: (s["n_fp"], s["max_fp_score"]),
    reverse=True
)
fp_pick = [s for s in fp_rank if s["n_fp"] > 0][:TOP_N_FP]

# FN Top: fn 개수 우선, 그 다음 fn_area_sum
fn_rank = sorted(
    fail_stats.values(),
    key=lambda s: (s["n_fn"], s["fn_area_sum"]),
    reverse=True
)
fn_pick = [s for s in fn_rank if s["n_fn"] > 0][:TOP_N_FN]

print("\n[SAMPLING]")
print(f"- FP candidates picked: {len(fp_pick)}")
print(f"- FN candidates picked: {len(fn_pick)}")

# -----------------------------
# 7) Visualization (GT=green, Pred=red)
# -----------------------------
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt

def load_font(size=18):
    candidates = [
        "C:/Windows/Fonts/malgun.ttf",
        "C:/Windows/Fonts/malgunbd.ttf",
        "/usr/share/fonts/truetype/nanum/NanumGothic.ttf",
    ]
    for fp in candidates:
        try:
            if Path(fp).exists():
                return ImageFont.truetype(fp, size=size)
        except Exception:
            pass
    return ImageFont.load_default()

font = load_font(18)

def draw_label(draw, x, y, text, color, font):
    pad = 4
    bb = draw.textbbox((0, 0), text, font=font)
    tw, th = bb[2]-bb[0], bb[3]-bb[1]
    # 화면 밖 방지
    y = max(0, y)
    draw.rectangle([x, y, x + tw + pad*2, y + th + pad*2], fill=color)
    draw.text((x + pad, y + pad), text, fill=(255, 255, 255), font=font)

def render_one(iid: int, stat: dict, out_path: Path, mode: str):
    """
    mode: "fp" or "fn" (표시 강조용. 둘 다 GT/Pred 모두 그림)
    """
    img_path = find_img_path(iid)
    im = Image.open(img_path).convert("RGB")
    W, H = im.size
    draw = ImageDraw.Draw(im)

    # title
    title = f"image_id={iid} | {img_path.name} | {W}x{H} | TP={stat['n_tp']} FP={stat['n_fp']} FN={stat['n_fn']}"
    draw.rectangle([0, 0, 10 + len(title)*10, 36], fill=(255, 255, 255))
    draw.text((6, 6), title, fill=(0, 0, 0), font=font)

    # GT boxes (green)
    gt_color = (0, 180, 0)
    for g in stat["gts"]:
        x1 = int(round(g["bbox_x"]))
        y1 = int(round(g["bbox_y"]))
        x2 = int(round(g["bbox_x"] + g["bbox_w"]))
        y2 = int(round(g["bbox_y"] + g["bbox_h"]))
        draw.rectangle([x1, y1, x2, y2], outline=gt_color, width=3)
        label = f"GT {cname(g['category_id'])} ({int(g['category_id'])})"
        draw_label(draw, x1, y1-24, label, gt_color, font)

    # Pred boxes (red)
    pred_color = (255, 0, 0)
    for p in stat["preds"]:
        x1 = int(round(p["bbox_x"]))
        y1 = int(round(p["bbox_y"]))
        x2 = int(round(p["bbox_x"] + p["bbox_w"]))
        y2 = int(round(p["bbox_y"] + p["bbox_h"]))
        draw.rectangle([x1, y1, x2, y2], outline=pred_color, width=3)
        label = f"{cname(p['category_id'])} ({int(p['category_id'])}) {p['score']:.3f}"
        draw_label(draw, x1, y1-24, label, pred_color, font)

    # 실패 강조(추가 표시): FP는 빨간 점수 높은 것, FN은 GT박스 두껍게
    if mode == "fp" and stat["fps"]:
        # FP 박스만 테두리 두껍게(겹쳐보이면 강조)
        for p in stat["fps"]:
            x1 = int(round(p["bbox_x"]))
            y1 = int(round(p["bbox_y"]))
            x2 = int(round(p["bbox_x"] + p["bbox_w"]))
            y2 = int(round(p["bbox_y"] + p["bbox_h"]))
            draw.rectangle([x1, y1, x2, y2], outline=(255, 50, 50), width=5)
    if mode == "fn" and stat["fns"]:
        for g in stat["fns"]:
            x1 = int(round(g["bbox_x"]))
            y1 = int(round(g["bbox_y"]))
            x2 = int(round(g["bbox_x"] + g["bbox_w"]))
            y2 = int(round(g["bbox_y"] + g["bbox_h"]))
            draw.rectangle([x1, y1, x2, y2], outline=(0, 255, 80), width=6)

    im.save(out_path)
    return im

# save + show grids
def show_grid(picks, tag):
    if not picks:
        print(f"[{tag}] no samples")
        return
    cols = 2
    rows = math.ceil(len(picks) / cols)
    plt.figure(figsize=(cols * 7, rows * 7))
    for i, s in enumerate(picks, 1):
        iid = int(s["image_id"])
        out_path = VIS_DIR / f"{tag}_{iid}.png"
        im = render_one(iid, s, out_path, mode=tag)

        ax = plt.subplot(rows, cols, i)
        ax.imshow(im)
        ax.axis("off")
        ax.set_title(out_path.name, fontsize=10)

    plt.tight_layout()
    print(f"[SAVED] {tag} visuals -> {VIS_DIR}")

print("\n[VISUALIZE] FP Top-N")
show_grid(fp_pick, "fp")

print("\n[VISUALIZE] FN Top-N")
show_grid(fn_pick, "fn")

# -----------------------------
# 8) Save summary report
# -----------------------------
report = {
    "config": {
        "IOU_MATCH": IOU_MATCH,
        "CONF": CONF,
        "NMS_IOU": NMS_IOU,
        "IMGSZ": IMGSZ,
        "MAX_DET": MAX_DET,
        "TOP_N_FP": TOP_N_FP,
        "TOP_N_FN": TOP_N_FN,
    },
    "fp_top": [{"image_id": s["image_id"], "n_fp": s["n_fp"], "max_fp_score": s["max_fp_score"]} for s in fp_pick],
    "fn_top": [{"image_id": s["image_id"], "n_fn": s["n_fn"], "fn_area_sum": s["fn_area_sum"]} for s in fn_pick],
}

out_json = VIS_DIR / "failcases_val_summary.json"
out_json.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")

print("\n[REPORT]")
print(f"- saved -> {out_json}")
print("- fp_top image_ids:", [x["image_id"] for x in report["fp_top"]])
print("- fn_top image_ids:", [x["image_id"] for x in report["fn_top"]])

[CONFIG]
- IOU_MATCH=0.5 | CONF=0.02 | NMS_IOU=0.5 | IMGSZ=768 | MAX_DET=200
- TOP_N_FP=12 | TOP_N_FN=12
- VIS_DIR=C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\vis\failcases_val
[OK] pred_by_img filled: 47 val images

[SAMPLING]
- FP candidates picked: 12
- FN candidates picked: 3

[VISUALIZE] FP Top-N
[SAVED] fp visuals -> C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\vis\failcases_val

[VISUALIZE] FN Top-N
[SAVED] fn visuals -> C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\vis\failcases_val

[REPORT]
- saved -> C:\Users\amy\Desktop\sprint\초급 프로젝트\pjt-sprint_ai07_healthcare\experiments\DM2\runs\exp_20260202_230604\vis\failcases_val\failcases_val_summary.json
- fp_top image_ids: [4, 6, 5, 127, 144, 164, 157, 78, 181, 90, 15, 18]
- fn_top image_ids: [4, 5, 6]
