In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip -q '/content/drive/MyDrive/AI활용 소프트웨어 개발/10. 파이썬 웹 서비스/Bee/data/Bee (1).zip'

In [None]:
# ============================
# [STEP 1-2] 임포트 & 하드웨어 점검 & 시드 고정
# ============================
import os, random, json, math, time                     # 표준 라이브러리
from pathlib import Path                                # 경로 다루기
import numpy as np                                      # 수치 연산
import torch                                            # PyTorch 메인
import torch.backends.cudnn as cudnn                    # CUDNN 옵션
from torch.cuda.amp import GradScaler, autocast         # AMP(Mixed Precision)
import math, random
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
from pathlib import Path

In [None]:
# GPU 사용 가능 여부 출력 (A100이 보이는지 확인)
print('[INFO] CUDA available:', torch.cuda.is_available())          # True면 GPU 사용 가능
if torch.cuda.is_available():                                       # GPU가 있으면
    print('[INFO] CUDA device:', torch.cuda.get_device_name(0))     # GPU 장치명 출력
    !nvidia-smi                                                      # 드라이버/메모리 상태 출력
else:
    print('[WARN] GPU가 보이지 않습니다. 런타임 -> 런타임 유형 변경 -> 하드웨어 가속기: GPU(A100) 선택 필요')

# 재현성(결과 일관성) 위해 시드 고정
SEED = 42                                                           # 고정 시드 값
random.seed(SEED)                                                   # 파이썬 랜덤 시드
np.random.seed(SEED)                                                # 넘파이 시드
torch.manual_seed(SEED)                                             # 파이토치 CPU 시드
torch.cuda.manual_seed_all(SEED) if torch.cuda.is_available() else None  # 파이토치 GPU 시드

# 연산 일관성/속도 설정 (deterministic은 True면 매우 일관, 다만 약간 느릴 수 있음)
cudnn.deterministic = True                                          # 결정적 알고리즘 사용
cudnn.benchmark    = False

In [None]:
# 프로젝트 루트 경로 지정
ROOT = Path('/content/bee_project')                   # 프로젝트 루트

# 데이터/출력 폴더 구조 정의
DIRS = [
    ROOT / 'images' / 'train',                                      # 원본 이미지(train)
    ROOT / 'images' / 'val',                                        # 원본 이미지(val)
    ROOT / 'images' / 'test',                                       # 원본 이미지(test) - 선택
    ROOT / 'labels' / 'train',                                      # 원본 라벨(JSON 등, train)
    ROOT / 'labels' / 'val',                                        # 원본 라벨(JSON 등, val)
    ROOT / 'labels' / 'test',                                       # 원본 라벨(JSON 등, test)
    ROOT / 'yolo_labels' / 'train',                                 # 변환된 YOLO 형식 라벨(txt, train)
    ROOT / 'yolo_labels' / 'val',                                   # 변환된 YOLO 형식 라벨(txt, val)
    ROOT / 'crops' / 'train',                                       # 탐지/GT 크롭(train)
    ROOT / 'crops' / 'val',                                         # 탐지/GT 크롭(val)
    ROOT / 'outputs' / 'weights',                                   # 체크포인트 저장 경로
    ROOT / 'outputs' / 'reports',                                   # 리포트/CSV/그래프 저장 경로
    ROOT / 'outputs' / 'viz',                                       # 시각화 결과(박스/크롭 등)
]

# 폴더 생성 (이미 있으면 건너뜀)
for d in DIRS:
    d.mkdir(parents=True, exist_ok=True)                             # 상위 폴더까지 생성
print('[INFO] Project root:', ROOT)                                  # 루트 경로 출력


In [None]:
# Robust Split + Rename: "__" 뒤 토큰/JSON 이미지명 활용해서 매칭 → 7:2:1 분할 후 강제 베이스네임으로 저장
import re, json, random, shutil, collections
from pathlib import Path

# 1) SOURCE 경로(원천)
SRC_IMG_BASE = Path('/content/images')
SRC_LBL_BASE = Path('/content/jsons')

# 2) DEST 경로(저장)
DEST_IMG_BASE = Path('/content/bee_project/images')
DEST_LBL_BASE = Path('/content/bee_project/labels')

# 3) 옵션
RATIOS      = (0.7, 0.2, 0.1)     # train, val, test
MOVE_FILES  = True                # 원본 이동(True) / 복사(False)
SEED        = 42
IMG_EXTS    = {'.jpg','.jpeg','.png','.bmp','.webp','.JPG','.JPEG','.PNG'}
LBL_EXTS    = {'.json','.JSON'}

# ------------ 유틸 ------------
def under_split_dir(p: Path, base: Path) -> bool:
    try:
        rel = p.relative_to(base)
    except ValueError:
        return False
    return len(rel.parts) > 0 and rel.parts[0] in {'train','val','test'}

def rscan(base: Path, exts: set):
    return sorted([p for p in base.rglob('*')
                   if p.is_file() and p.suffix in exts and not under_split_dir(p, base)])

def tail_after_double_underscore(name: str) -> str:
    """'...__01_1_D_AB_AP_20220812_01_0011.jpg' -> '01_1_D_AB_AP_20220812_01_0011'"""
    stem = Path(name).stem
    return stem.split('__')[-1] if '__' in stem else stem

def read_image_name_from_json(jp: Path):
    try:
        d = json.loads(jp.read_text(encoding='utf-8'))
    except Exception:
        return None
    if isinstance(d, dict) and isinstance(d.get('IMAGE'), dict):
        for k in ('IMAGE_FILE_NAME','FILE_NAME','filename'):
            v = d['IMAGE'].get(k)
            if isinstance(v, str) and v.strip():
                return Path(v).name
    for k in ('imagePath','filename','file_name','image'):
        v = d.get(k)
        if isinstance(v, str) and v.strip():
            return Path(v).name
    return None

def update_json_image_name(jp: Path, new_img_name: str):
    try:
        data = json.loads(jp.read_text(encoding='utf-8'))
    except Exception:
        return
    if isinstance(data, dict):
        if isinstance(data.get('IMAGE'), dict):
            for k in ('IMAGE_FILE_NAME','FILE_NAME','filename'):
                if k in data['IMAGE']:
                    data['IMAGE'][k] = new_img_name
        for k in ('imagePath','filename','file_name','image'):
            if k in data:
                data[k] = new_img_name
    jp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')

def transfer(src: Path, dst: Path, move=True):
    dst.parent.mkdir(parents=True, exist_ok=True)
    if move:
        shutil.move(str(src), str(dst))
    else:
        shutil.copy2(str(src), str(dst))

def unique_basename(dst_img_dir: Path, dst_lbl_dir: Path, base: str, img_ext: str) -> str:
    """이미지/라벨 모두와 충돌 없도록 베이스네임 고유화"""
    b = base
    k = 1
    while (dst_img_dir / f"{b}{img_ext}").exists() or (dst_lbl_dir / f"{b}.json").exists():
        b = f"{base}_{k}"
        k += 1
    return b

# ------------ 수집 ------------
imgs = rscan(SRC_IMG_BASE, IMG_EXTS)
lbls = rscan(SRC_LBL_BASE, LBL_EXTS)

print(f"[DEBUG] found images={len(imgs)}, labels={len(lbls)}")
if not imgs: print(f"[HINT] 이미지가 없어요: {SRC_IMG_BASE}")
if not lbls: print(f"[HINT] 라벨이 없어요: {SRC_LBL_BASE}")

# 이미지 인덱스: 여러 키로 접근 가능하게 구성
img_index = {}  # key(lower) -> Path
def add_img_key(key: str, p: Path):
    key = key.strip().lower()
    if key and key not in img_index:
        img_index[key] = p

for ip in imgs:
    stem = ip.stem
    tail = tail_after_double_underscore(ip.name)
    add_img_key(stem, ip)
    add_img_key(tail, ip)

# 라벨→이미지 매칭
pairs = []
unmatched = []
for jp in lbls:
    # 1) JSON 내부 이미지명 기반 후보
    nm = read_image_name_from_json(jp)
    cand_keys = []
    if nm:
        cand_keys += [Path(nm).stem, tail_after_double_underscore(nm)]
    # 2) 라벨 파일명 기반 후보
    cand_keys += [jp.stem, tail_after_double_underscore(jp.name)]
    # 매칭
    ip = None
    for k in cand_keys:
        ip = img_index.get(k.strip().lower())
        if ip:
            break
    if ip:
        pairs.append((ip, jp))
    else:
        unmatched.append(jp)

print(f"[DEBUG] paired={len(pairs)}  unmatched_labels={len(unmatched)}")
if unmatched[:5]:
    print("[DEBUG] sample unmatched label stems:", [p.stem for p in unmatched[:5]])

# 쌍이 0이면 여기서 종료(원인 로그 확인)
if len(pairs) == 0:
    print("[ERROR] 이미지-라벨 매칭 0건입니다. 위 [DEBUG] 출력으로 stem/tail/JSON 이미지명을 확인하세요.")
else:
    # ------------ 분할 ------------
    random.seed(SEED)
    random.shuffle(pairs)
    N = len(pairs)
    n_tr = int(N*RATIOS[0]); n_va = int(N*RATIOS[1]); n_te = N - n_tr - n_va
    splits = {'train': pairs[:n_tr], 'val': pairs[n_tr:n_tr+n_va], 'test': pairs[n_tr+n_va:]}

    for sp in splits:
        (DEST_IMG_BASE/sp).mkdir(parents=True, exist_ok=True)
        (DEST_LBL_BASE/sp).mkdir(parents=True, exist_ok=True)

    # 같은 이미지가 여러 라벨과 매칭되었는지 체크(있으면 이미지만 copy로 강제)
    img_counts = collections.Counter([ip for ip, _ in pairs])
    dup_imgs_exist = any(c > 1 for c in img_counts.values())
    MOVE_IMG = MOVE_FILES and not dup_imgs_exist
    if dup_imgs_exist:
        print("[WARN] 동일 이미지에 여러 라벨이 연결됨 → 이미지 파일은 copy 모드로 전환(MOVE_IMG=False)")

    # ------------ 저장(이름 강제 규칙) ------------
    for sp, items in splits.items():
        dst_idir = DEST_IMG_BASE/sp
        dst_ldr  = DEST_LBL_BASE/sp
        for ip, jp in items:
            # 최종 베이스네임: "__" 뒤 토큰 우선, 없으면 stem
            nm_json = read_image_name_from_json(jp)
            if nm_json:
                base = tail_after_double_underscore(nm_json)
            else:
                base = tail_after_double_underscore(jp.name)

            img_ext = ip.suffix.lower()
            base = unique_basename(dst_idir, dst_ldr, base, img_ext)

            dst_img = dst_idir / f"{base}{img_ext}"
            dst_lbl = dst_ldr  / f"{base}.json"

            # 이미지/라벨 저장
            transfer(ip, dst_img, move=MOVE_IMG)          # 이미지: 중복 시 copy
            transfer(jp, dst_lbl, move=MOVE_FILES)        # 라벨: 설정 그대로

            # JSON 내부 이미지 파일명 동기화
            update_json_image_name(dst_lbl, dst_img.name)

    print(f"[DONE] total pairs={N}  train={len(splits['train'])}, val={len(splits['val'])}, test={len(splits['test'])}")
    print(f"[DEST] images -> {DEST_IMG_BASE}")
    print(f"[DEST] labels -> {DEST_LBL_BASE}")


In [None]:
# 원본 폴더 삭제해주기
!rm -rf /content/images
!rm -rf /content/jsons

In [None]:
# ============================
# [STEP 2-1] 경로/설정 준비
#  - 1단계에서 만든 CONFIG가 없어도 안전하게 동작하도록 방어 코드를 포함
# ============================
import os, json, csv, math
from pathlib import Path
import cv2
import numpy as np
from tqdm import tqdm

# 1) 프로젝트 루트 기본값 (1단계와 동일)
DEFAULT_ROOT = Path('/content/bee_project')

# 2) 실제 데이터가 있는 원본 루트(사용자 제공 구조)
#    sample_2400/
#      ├─ images/{train,val,test}/*.jpg
#      └─ labels/{train,val,test}/*.json

# RAW_ROOT = Path('/content/drive/MyDrive/AI활용 소프트웨어 개발/10. 파이썬 웹 서비스/Bee/data/sample_2400') # 드라이브 버전
RAW_ROOT = Path('/content/bee_project')


# 3) 프로젝트 루트 결정: 1단계 CONFIG가 있으면 그걸, 없으면 기본값
CONFIG_PATH = DEFAULT_ROOT / 'project_config.yaml'
if CONFIG_PATH.exists():
    import yaml
    with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
        CFG = yaml.safe_load(f)
    ROOT = Path(CFG.get('project_root', str(DEFAULT_ROOT)))
else:
    ROOT = DEFAULT_ROOT

# 4) 출력 폴더들 보장 생성
( ROOT / 'yolo_labels' / 'train' ).mkdir(parents=True, exist_ok=True)
( ROOT / 'yolo_labels' / 'val'   ).mkdir(parents=True, exist_ok=True)
( ROOT / 'yolo_labels' / 'test'  ).mkdir(parents=True, exist_ok=True)
( ROOT / 'outputs' / 'reports'   ).mkdir(parents=True, exist_ok=True)
( ROOT / 'outputs' / 'viz'       ).mkdir(parents=True, exist_ok=True)

print('[INFO] PROJECT ROOT =', ROOT)
print('[INFO] RAW ROOT     =', RAW_ROOT)



In [None]:
# ============================
# [STEP 2-2] 유틸: YOLO 포맷 변환
# ============================
def _clamp(v, lo, hi):
    """값 v를 [lo, hi] 범위로 자르기(좌표 안정화)"""
    return max(lo, min(hi, v))

def bbox_to_yolo_line(x1, y1, x2, y2, W, H, cls_id=0):
    # 1) 좌표 정리
    x1, x2 = min(x1, x2), max(x1, x2)
    y1, y2 = min(y1, y2), max(y1, y2)

    # 2) 이미지 경계로 클램프(음수/초과 방지)
    x1 = _clamp(x1, 0, W - 1)
    x2 = _clamp(x2, 0, W - 1)
    y1 = _clamp(y1, 0, H - 1)
    y2 = _clamp(y2, 0, H - 1)

    # 3) 중심/크기 계산
    cx = (x1 + x2) / 2.0
    cy = (y1 + y2) / 2.0
    bw = (x2 - x1)
    bh = (y2 - y1)

    # 4) 극단값 방지
    if bw <= 0 or bh <= 0:
        return None

    # 5) [0,1] 정규화
    cx_n = cx / W
    cy_n = cy / H
    bw_n = bw / W
    bh_n = bh / H

    # ======== ★ 여기 추가: 미세 오차 제거 (정확한 삽입 위치) ========
    import numpy as np
    eps = 1e-6
    cx_n = float(np.clip(cx_n, eps, 1.0 - eps))
    cy_n = float(np.clip(cy_n, eps, 1.0 - eps))
    bw_n = float(np.clip(bw_n, eps, 1.0 - eps))
    bh_n = float(np.clip(bh_n, eps, 1.0 - eps))
    # ============================================================

    # 6) YOLO txt 한 줄 문자열
    return f"{cls_id} {cx_n:.6f} {cy_n:.6f} {bw_n:.6f} {bh_n:.6f}"


def read_image_size(img_path, fallback_W=None, fallback_H=None):
    """
    이미지 크기 읽기. 실패 시 폴백 사용 (JSON WIDTH/HEIGHT가 있으면 폴백으로 넣기)
    """
    img = cv2.imread(str(img_path))
    if img is not None:
        H, W = img.shape[:2]
        return W, H
    # 이미지 읽기 실패하면 폴백
    return fallback_W, fallback_H

# ============================
# [STEP 2-3] 변환기 본체
#  - JSON → YOLO .txt
#  - 리포트 CSV 생성
#  - 샘플 시각화 저장
# ============================
def convert_split(split='train', visualize_n=10):
    """
    주어진 split(train/val/test)에 대해:
      - labels/*.json 읽어 bbox → yolo_labels/*.txt 저장
      - 무결성 리포트 CSV 저장
      - 첫 N개 샘플 시각화 이미지 저장
    """
    json_dir = RAW_ROOT / 'labels' / split
    img_dir  = RAW_ROOT / 'images' / split
    out_dir  = ROOT / 'yolo_labels' / split
    viz_dir  = ROOT / 'outputs' / 'viz' / split
    viz_dir.mkdir(parents=True, exist_ok=True)

    # 리포트 파일 경로
    report_csv = ROOT / 'outputs' / 'reports' / f'yolo_convert_report_{split}.csv'

    json_files = sorted(list(json_dir.glob('*.json')))
    if len(json_files) == 0:
        print(f'[WARN] No JSONs in {json_dir}')
        return

    rows = []  # 리포트 누적
    vcount = 0 # 시각화 저장 개수

    for jp in tqdm(json_files, desc=f'[{split}] converting', ncols=80):
        # 1) JSON 로드
        with open(jp, 'r', encoding='utf-8') as f:
            data = json.load(f)

        # 2) 파일명/이미지 경로 결정
        #    - JSON 내부 파일명이 있으면 우선 사용, 없으면 json 스템으로 유추
        jname = data.get('IMAGE', {}).get('IMAGE_FILE_NAME', jp.stem + '.jpg')
        ipath = img_dir / jname

        # 3) 이미지 크기(W,H) 확보: JSON → 실패 시 실제 이미지에서 읽기
        W_json = data.get('IMAGE', {}).get('WIDTH', None)
        H_json = data.get('IMAGE', {}).get('HEIGHT', None)
        W, H   = read_image_size(ipath, fallback_W=W_json, fallback_H=H_json)

        # 4) 이미지/치수 유효성 체크
        if W is None or H is None:
            rows.append([jp.name, jname, 'FAIL', 'image_size_missing', 0])
            continue
        if not ipath.exists():
            rows.append([jp.name, jname, 'FAIL', 'image_missing', 0])
            continue

        # 5) 어노테이션 목록
        anns = data.get('ANNOTATION_INFO', []) or []
        yolo_lines = []
        valid_boxes = 0

        for ann in anns:
            # JSON 키: XTL, YTL, XBR, YBR (업로드 예시와 동일)  :contentReference[oaicite:1]{index=1}
            x1 = float(ann.get('XTL', 0))
            y1 = float(ann.get('YTL', 0))
            x2 = float(ann.get('XBR', 0))
            y2 = float(ann.get('YBR', 0))

            line = bbox_to_yolo_line(x1, y1, x2, y2, W, H, cls_id=0)
            if line is None:
                continue
            yolo_lines.append(line)
            valid_boxes += 1

        # 6) YOLO 라벨 저장 (박스가 0개여도 빈 파일로 생성하는 쪽이 로더 호환성 ↑)
        out_txt = out_dir / f"{Path(jname).stem}.txt"
        with open(out_txt, 'w', encoding='utf-8') as f:
            f.write('\n'.join(yolo_lines))

        # 7) 리포트 누적
        rows.append([jp.name, jname, 'OK', 'saved', valid_boxes])

        # 8) 시각화 샘플 저장 (선택)
        if vcount < visualize_n:
            img = cv2.imread(str(ipath))
            if img is not None:
                for line in yolo_lines:
                    _, cx, cy, bw, bh = line.split()
                    cx = float(cx) * W
                    cy = float(cy) * H
                    bw = float(bw) * W
                    bh = float(bh) * H

                    x1 = cx - bw/2
                    y1 = cy - bh/2
                    x2 = cx + bw/2
                    y2 = cy + bh/2

                    # ======== ★ 여기 추가: 최종 픽셀 경계 clip (정확한 삽입 위치) ========
                    x1 = int(max(0, min(W-1, x1)))
                    y1 = int(max(0, min(H-1, y1)))
                    x2 = int(max(0, min(W-1, x2)))
                    y2 = int(max(0, min(H-1, y2)))
                    if x2 <= x1 or y2 <= y1:
                        continue  # 면적 없는 박스는 스킵
                    # ============================================================

                    cv2.rectangle(img, (x1, y1), (x2, y2), (0,255,0), 2)

                out_img = viz_dir / f"{Path(jname).stem}_viz.jpg"
                cv2.imwrite(str(out_img), img)
                vcount += 1


    # 9) 리포트 CSV 저장
    with open(report_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['json_name','image_name','status','note','num_boxes'])
        writer.writerows(rows)

    # 10) 결과 요약 출력
    ok = sum(1 for r in rows if r[2]=='OK')
    fail = len(rows) - ok
    total_boxes = sum(int(r[4]) for r in rows)
    print(f'[SUMMARY][{split}] files: {len(rows)} | OK: {ok} | FAIL: {fail} | boxes: {total_boxes}')
    print(f'[REPORT] {report_csv}')
    print(f'[VIZ]    {viz_dir} (saved {vcount} preview images)')


In [None]:
# ============================
# [STEP 2-4] 변환 실행 (train/val/test)
# ============================
for sp in ['train', 'val', 'test']:
    convert_split(sp, visualize_n=10)

In [None]:
# ============================
# [STEP 1-4] 프로젝트 설정(config) 저장
#  - YOLOv3 탐지: 단일 클래스 'bee' (id=0)
#  - 분류(ResNet-18): 클래스 이름은 추후 확정/수정 가능 (임시 예시)
#  - 입력 해상도, 학습 하이퍼파라미터를 한 곳에서 관리
# ============================
import yaml                                                          # YAML 포맷 저장

config = {
    'project_root': str(ROOT),                                       # 프로젝트 루트 경로
    'random_seed': SEED,                                             # 고정 시드 값
    'device': 'cuda' if torch.cuda.is_available() else 'cpu',        # 디바이스 기본값
    'detection': {                                                   # 탐지(YOLOv3) 설정
        'classes': ['bee'],                                          # 단일 클래스: bee
        'num_classes': 1,                                            # 클래스 수
        'img_size': 416,                                             # 입력 해상도(권장 640, 416도 가능)
        'epochs': 30,                                               # 학습 에폭
        'batch_size': 16,                                            # 배치 크기 (A100이면 16~32 권장)
        'learning_rate': 1e-3,                                       # 초기 학습률(코사인/워밍업과 함께 조정)
        'weight_decay': 5e-4,                                        # 가중치 감쇠
        'warmup_epochs': 3,                                          # 워밍업 에폭
        'amp': True,                                                 # AMP(Mixed Precision) 사용
        'train_images': str(ROOT / 'images' / 'train'),              # 학습 이미지 경로
        'val_images':   str(ROOT / 'images' / 'val'),                # 검증 이미지 경로
        'train_labels': str(ROOT / 'yolo_labels' / 'train'),         # YOLO txt 라벨(train)
        'val_labels':   str(ROOT / 'yolo_labels' / 'val'),           # YOLO txt 라벨(val)
        'optimizer': 'sgd',                                          # 'sgd' 또는 'adamw' 등
        'conf_threshold': 0.6,                                      # 추론 시 confidence 임계값
        'iou_threshold': 0.4,                                        # NMS IoU 임계값
        'box_margin_ratio': 0.08,                                    # 크롭 시 여백 비율(8%)
        'augment': {                                                 # 탐지용 증강(필요시 2단계에서 조정)
            'hflip': True,                                           # 좌우 뒤집기
            'hsv': True,                                             # 색상(H,S,V) 변형
            'mosaic': False                                          # Mosaic는 안정성상 기본 False (원하면 True로)
        }
    },
    'classification': {                                              # 분류(ResNet-18) 설정
        # 아래 클래스 이름은 임시 예시입니다. 2단계에서 확정 후 수정 예정
        'classes': [
            '수일벌-이탈리안','수일벌-카니올란','수일벌-한봉','수일벌-호박벌',
            '여왕벌-이탈리안','여왕벌-카니올란','여왕벌-한봉','여왕벌-호박벌'
        ],
        'img_size': 224,                                             # ResNet-18 입력 크기
        'epochs': 20,                                                # 분류 학습 에폭
        'batch_size': 64,                                            # 분류 배치 (A100이면 넉넉)
        'learning_rate': 1e-3,                                       # 초기 학습률
        'weight_decay': 1e-4,                                        # 가중치 감쇠
        'amp': True,                                                 # AMP 사용
        'augment': {                                                 # 분류용 증강
            'randaugment': True,                                     # 강한 증강(필요시 강도 조절)
            'color_jitter': True,                                    # 색상 조절
            'hflip': True                                            # 좌우 뒤집기
        }
    },
    'paths': {                                                       # 자주 쓰는 경로 모음
        'json_labels_train': str(ROOT / 'labels' / 'train'),         # 원본 JSON 라벨(train)
        'json_labels_val':   str(ROOT / 'labels' / 'val'),           # 원본 JSON 라벨(val)
        'json_labels_test':  str(ROOT / 'labels' / 'test'),          # 원본 JSON 라벨(test)
        'yolo_labels_train': str(ROOT / 'yolo_labels' / 'train'),    # 변환된 YOLO 라벨(train)
        'yolo_labels_val':   str(ROOT / 'yolo_labels' / 'val'),      # 변환된 YOLO 라벨(val)
        'crops_train':       str(ROOT / 'crops' / 'train'),          # 크롭 이미지(train)
        'crops_val':         str(ROOT / 'crops' / 'val'),            # 크롭 이미지(val)
        'weights':           str(ROOT / 'outputs' / 'weights'),      # 모델 가중치 저장
        'reports':           str(ROOT / 'outputs' / 'reports'),      # CSV/리포트
        'viz':               str(ROOT / 'outputs' / 'viz')           # 시각화 결과
    }
}

# 설정을 YAML로 저장 (나중에 로더가 이 파일만 읽으면 전체 파이프라인 재현 가능)
CONFIG_PATH = ROOT / 'project_config.yaml'                           # 설정 파일 경로
with open(CONFIG_PATH, 'w', encoding='utf-8') as f:                  # 파일 열기(UTF-8)
    yaml.safe_dump(config, f, allow_unicode=True, sort_keys=False)   # 한글 보존하며 저장
print('[INFO] Config saved to:', CONFIG_PATH)                         # 저장 경로 출력


In [None]:
# ============================
# [STEP 1-5] 유틸 함수: 설정 로드 & AMP 스칼라 생성
# ============================
def load_config(cfg_path: Path) -> dict:
    """YAML 설정 파일을 읽어 dict로 반환"""
    with open(cfg_path, 'r', encoding='utf-8') as f:   # 설정 파일 열기
        data = yaml.safe_load(f)                       # YAML -> dict 변환
    return data                                        # 설정 딕셔너리 반환

def build_amp_scaler(enabled: bool = True) -> GradScaler:
    """AMP 사용 플래그에 따라 GradScaler 생성"""
    return GradScaler(enabled=enabled)                 # enabled=False면 FP32로 동작

# 설정 로드/프린트 테스트
CFG = load_config(CONFIG_PATH)                         # 방금 저장한 설정 불러오기
print('[INFO] Loaded config keys:', list(CFG.keys()))  # 최상위 키 확인
print('[INFO] Detection img_size =', CFG['detection']['img_size'])  # 예시 출력
SCALER_DET = build_amp_scaler(CFG['detection']['amp']) # 탐지용 AMP 스칼라
SCALER_CLS = build_amp_scaler(CFG['classification']['amp']) # 분류용 AMP 스칼라


In [None]:
# 1) 충돌 가능성 있는 패키지 제거
!pip uninstall -y albumentations albucore opencv-python opencv-python-headless numpy

# 2) 호환성 검증된 조합으로 재설치
#  - albumentations 1.4.16 : albucore 사용 버전 (HF 이슈 스레드에서 OK 보고)
#  - albucore 0.0.16       : preserve_channel_dim 제공
#  - numpy 1.26.4           : PyTorch/Colab와 안정 조합
#  - opencv 4.9.0.80        : 알부/넘파이와 궁합 양호
!pip install albumentations==1.4.16 albucore==0.0.16 numpy==1.26.4 opencv-python==4.9.0.80


In [None]:
# (이미 언인스톨했으니) 호환 버전 '한 번에' 설치
!pip install --no-cache-dir \
  numpy==1.26.4 \
  albucore==0.0.17 \
  albumentations==1.4.16 \
  opencv-python-headless==4.9.0.80


In [None]:
# ============================================
# [STEP 3-1] YOLO 데이터로더 (설치 없이 동작하는 버전)
# ============================================
import os, glob, math, random, json
from pathlib import Path
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# OpenCV 대신 PIL 사용 (기본 설치됨)
try:
    import cv2
    USE_CV2 = True
except ImportError:
    from PIL import Image
    USE_CV2 = False
    print("[INFO] OpenCV not found, using PIL instead")

# 설정 파일 로드 (yaml 없이 처리)
try:
    import yaml
    CFG_PATH = Path('/content/bee_project/project_config.yaml')
    if CFG_PATH.exists():
        with open(CFG_PATH, 'r', encoding='utf-8') as f:
            CFG = yaml.safe_load(f)
    else:
        CFG = None
except ImportError:
    print("[INFO] PyYAML not found, using default config")
    CFG = None

# 기본 설정
if CFG is None:
    CFG = {
        'detection': {
            'img_size': 416, 'batch_size': 16, 'amp': True,
            'train_images': '/content/bee_project/images/train',
            'val_images':   '/content/bee_project/images/val',
            'train_labels': '/content/bee_project/yolo_labels/train',
            'val_labels':   '/content/bee_project/yolo_labels/val',
        }
    }

# 경로/파라미터 가져오기
DET = CFG['detection']
IMG_SIZE = int(DET['img_size'])
TRAIN_IMG_DIR = Path('/content/bee_project/images/train')
VAL_IMG_DIR   = Path('/content/bee_project/images/val')
TRAIN_LBL_DIR = Path('/content/bee_project/yolo_labels/train')
VAL_LBL_DIR   = Path('/content/bee_project/yolo_labels/val')

# ---------------- 이미지 변환 함수들 (Albumentations 대체) ----------------
def resize_with_padding(image, target_size, fill_value=114):
    """이미지를 비율 유지하며 리사이즈하고 패딩 추가"""
    if USE_CV2:
        h, w = image.shape[:2]
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 리사이즈
        resized = cv2.resize(image, (new_w, new_h))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        bottom = pad_h - top
        left = pad_w // 2
        right = pad_w - left

        # 패딩 적용
        padded = cv2.copyMakeBorder(resized, top, bottom, left, right,
                                   cv2.BORDER_CONSTANT, value=(fill_value, fill_value, fill_value))
        return padded, scale, (left, top)
    else:
        # PIL 버전
        w, h = image.size
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 리사이즈
        resized = image.resize((new_w, new_h), Image.Resampling.LANCZOS)

        # 새 이미지 생성 (패딩)
        new_image = Image.new('RGB', (target_size, target_size), (fill_value, fill_value, fill_value))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        left = pad_w // 2

        # 붙여넣기
        new_image.paste(resized, (left, top))

        return np.array(new_image), scale, (left, top)

def transform_bboxes(bboxes, scale, pad_offset):
    """바운딩 박스를 변환된 이미지에 맞게 조정"""
    if len(bboxes) == 0:
        return bboxes

    transformed = []
    pad_x, pad_y = pad_offset

    for bbox in bboxes:
        x1, y1, x2, y2 = bbox
        # 스케일 적용
        x1 *= scale
        y1 *= scale
        x2 *= scale
        y2 *= scale
        # 패딩 오프셋 적용
        x1 += pad_x
        y1 += pad_y
        x2 += pad_x
        y2 += pad_y
        transformed.append([x1, y1, x2, y2])

    return transformed

# ---------------- 좌표 변환/클립 유틸 ----------------
EPS = 1e-6

def _clip_yolo_norm(boxes, eps=EPS, min_wh=EPS):
    """
    YOLO 정규화 [[cx,cy,w,h], ...]를 [eps,1-eps]로 강제 + 최소 w/h 보장.
    (미세 음수/초과를 전부 흡수)
    """
    out=[]
    for cx, cy, w, h in boxes:
        cx = float(min(1.0 - eps, max(eps, float(cx))))
        cy = float(min(1.0 - eps, max(eps, float(cy))))
        w  = float(min(1.0 - eps, max(min_wh, float(w))))
        h  = float(min(1.0 - eps, max(min_wh, float(h))))
        out.append([cx, cy, w, h])
    return out

def yolo_norm_to_voc_abs(boxes_yolo, W, H):
    """[cx,cy,w,h] (0~1) -> [x1,y1,x2,y2] (pixels)"""
    out = []
    for cx, cy, w, h in boxes_yolo:
        bw, bh = float(w)*W, float(h)*H
        x1 = float(cx)*W - bw/2.0
        y1 = float(cy)*H - bh/2.0
        x2 = x1 + bw
        y2 = y1 + bh
        out.append([x1, y1, x2, y2])
    return out

def _clip_voc_abs(boxes, W, H, eps=EPS):
    """
    픽셀 절대좌표를 이미지 경계 [0,W-1],[0,H-1]로 교차-클립.
    무면적(<=eps)은 제거.
    """
    out=[]
    for x1,y1,x2,y2 in boxes:
        x1 = float(np.clip(x1, 0, W-1)); y1 = float(np.clip(y1, 0, H-1))
        x2 = float(np.clip(x2, 0, W-1)); y2 = float(np.clip(y2, 0, H-1))
        if x2 <= x1 + eps or y2 <= y1 + eps:
            continue
        out.append([x1,y1,x2,y2])
    return out

def voc_abs_to_yolo_norm(boxes_voc, W, H, eps=EPS):
    """[x1,y1,x2,y2] (pixels) -> [cx,cy,w,h] (0~1) with final eps-clip"""
    out = []
    for x1, y1, x2, y2 in boxes_voc:
        # 경계 clip + 면적 0 방지 (재확인)
        x1 = float(np.clip(x1, 0, W-1)); y1 = float(np.clip(y1, 0, H-1))
        x2 = float(np.clip(x2, 0, W-1)); y2 = float(np.clip(y2, 0, H-1))
        if x2 <= x1 + eps or y2 <= y1 + eps:
            continue
        bw = x2 - x1; bh = y2 - y1
        cx = x1 + bw/2.0; cy = y1 + bh/2.0
        cx_n = cx / W; cy_n = cy / H; bw_n = bw / W; bh_n = bh / H
        # 마지막 한 번 더 클립 (부동소수점 마감)
        cx_n = float(np.clip(cx_n, eps, 1.0 - eps))
        cy_n = float(np.clip(cy_n, eps, 1.0 - eps))
        bw_n = float(np.clip(bw_n, eps, 1.0 - eps))
        bh_n = float(np.clip(bh_n, eps, 1.0 - eps))
        out.append([cx_n, cy_n, bw_n, bh_n])
    return out

# ---------------- Dataset ----------------
class YoloDetectionDataset(Dataset):
    """YOLO txt 라벨을 읽어 오는 커스텀 데이터셋 (음수좌표 방지 패치 포함)"""
    def __init__(self, img_dir: Path, lbl_dir: Path, transforms=None, num_classes=1, debug=False):
        self.img_paths = sorted([p for p in img_dir.glob('*.jpg')] + [p for p in img_dir.glob('*.png')])
        self.lbl_dir   = lbl_dir
        self.transforms = transforms
        self.num_classes = num_classes
        self.debug_print_count = 0
        self.debug = debug  # 디버그 모드 제어

    def __len__(self):
        return len(self.img_paths)

    def _read_yolo_txt(self, txt_path: Path):
        boxes, labels = [], []
        if not txt_path.exists():
            return boxes, labels
        with open(txt_path, 'r', encoding='utf-8') as f:
            for ln in f.read().strip().splitlines():
                if not ln.strip():
                    continue
                parts = ln.split()
                if len(parts) < 5:
                    continue
                cls = int(float(parts[0]))
                cx, cy, w, h = map(float, parts[1:5])
                boxes.append([cx, cy, w, h])
                labels.append(cls)
        return boxes, labels

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]

        # 이미지 로드
        if USE_CV2:
            img_bgr = cv2.imread(str(img_path))
            assert img_bgr is not None, f'이미지 로드 실패: {img_path}'
            img = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
        else:
            img_pil = Image.open(str(img_path)).convert('RGB')
            img = np.array(img_pil)

        H0, W0 = img.shape[:2]

        txt_path = self.lbl_dir / f'{img_path.stem}.txt'
        boxes_yolo, labels = self._read_yolo_txt(txt_path)

        # (A) YOLO 정규화 좌표를 먼저 eps-clip (미세 음수/초과 흡수)
        boxes_yolo = _clip_yolo_norm(boxes_yolo, eps=EPS, min_wh=EPS)

        # (B) YOLO -> VOC(픽셀) 변환
        boxes_voc_abs = yolo_norm_to_voc_abs(boxes_yolo, W0, H0)

        # (C) 변환 '직전' 픽셀 좌표 교차-클립
        boxes_voc_abs = _clip_voc_abs(boxes_voc_abs, W0, H0, eps=EPS)

        # 이미지 변환 (Albumentations 대체)
        transformed_img, scale, pad_offset = resize_with_padding(img, IMG_SIZE)

        # 바운딩 박스 변환
        if len(boxes_voc_abs) > 0:
            bxs_voc = transform_bboxes(boxes_voc_abs, scale, pad_offset)
            clss = labels[:len(bxs_voc)]  # 길이 맞추기
        else:
            bxs_voc = []
            clss = []

        # 이미지를 텐서로 변환
        if USE_CV2:
            timg = torch.from_numpy(transformed_img).permute(2, 0, 1).float() / 255.0
        else:
            timg = torch.from_numpy(transformed_img).permute(2, 0, 1).float() / 255.0

        # (D) 변환 '직후' 픽셀 좌표 재-클립
        H1, W1 = timg.shape[1], timg.shape[2]
        bxs_voc = _clip_voc_abs(bxs_voc, W1, H1, eps=EPS)

        # (E) 최종: VOC(픽셀) -> YOLO(정규화) with eps-clip
        boxes_yolo_final = voc_abs_to_yolo_norm(bxs_voc, W1, H1, eps=EPS)

        # (F) 텐서 구성 (라벨 길이와 매칭)
        if len(boxes_yolo_final) > 0:
            # clss 길이가 박스보다 길 수 있으니 zip으로 안전 매칭
            targets = torch.tensor([list(b)+[c] for b, c in zip(boxes_yolo_final, clss)], dtype=torch.float32)
        else:
            targets = torch.zeros((0,5), dtype=torch.float32)

        # 디버그 출력 제거 (깔끔한 학습을 위해)
        if self.debug and self.debug_print_count < 3:
            print(f"[DEBUG] {img_path.name}  H0xW0={H0}x{W0} -> H1xW1={H1}x{W1}")
            print("  yolo_in :", boxes_yolo[:3])
            print("  voc_pre :", boxes_voc_abs[:3])
            print("  voc_post:", bxs_voc[:3])
            print("  yolo_out:", boxes_yolo_final[:3])
            self.debug_print_count += 1

        return timg, targets, img_path.name

def collate_fn(batch):
    imgs, targets, names = [], [], []
    for i, (img, t, name) in enumerate(batch):
        imgs.append(img)
        if t.numel() > 0:
            ti = torch.zeros((t.size(0), 6), dtype=torch.float32)
            ti[:,0] = i
            ti[:,1:] = t
            targets.append(ti)
        names.append(name)
    targets = torch.cat(targets, dim=0) if len(targets)>0 else torch.zeros((0,6), dtype=torch.float32)
    return torch.stack(imgs, 0), targets, names

# ---------------- DataLoader ----------------
train_ds = YoloDetectionDataset(TRAIN_IMG_DIR, TRAIN_LBL_DIR, num_classes=1, debug=True)   # 훈련 시에만 디버그
val_ds   = YoloDetectionDataset(VAL_IMG_DIR,   VAL_LBL_DIR,   num_classes=1, debug=False)  # validation 시 디버그 끄기

train_loader = DataLoader(train_ds, batch_size=int(DET['batch_size']), shuffle=True,
                          num_workers=2, pin_memory=True, collate_fn=collate_fn, drop_last=True)
val_loader   = DataLoader(val_ds, batch_size=int(DET['batch_size']), shuffle=False,
                          num_workers=2, pin_memory=True, collate_fn=collate_fn, drop_last=False)

print('[INFO] train samples:', len(train_ds), '| val samples:', len(val_ds))

In [None]:
imgs, targets, names = next(iter(train_loader))
assert imgs.dim() == 4 and imgs.shape[1] == 3
if targets.numel() > 0:
    assert torch.all((targets[:,1:5] > 0) & (targets[:,1:5] < 1)), "좌표 0~1 범위를 벗어났습니다."
print("OK: dataloader pipeline (pascal_voc → yolo) works.")


In [None]:
print("IMG DIRS:", TRAIN_IMG_DIR, VAL_IMG_DIR)
print("LBL DIRS:", TRAIN_LBL_DIR, VAL_LBL_DIR)
print("EXIST?  :", TRAIN_IMG_DIR.exists(), VAL_IMG_DIR.exists(), TRAIN_LBL_DIR.exists(), VAL_LBL_DIR.exists())

def count_imgs(root: Path):
    exts = ['*.jpg','*.jpeg','*.png','*.JPG','*.JPEG','*.PNG']
    return sum(len(list(root.rglob(e))) for e in exts)

print("Found(train imgs):", count_imgs(TRAIN_IMG_DIR))
print("Found(val   imgs):", count_imgs(VAL_IMG_DIR))


In [None]:
# ============================================
# [STEP 3-2] YOLOv3 네트워크 (Darknet-53 백본 + 3-스케일 헤드)
#  - 출력: 3개 스케일의 예측 텐서 [B, A*(5+C), S, S]
#  - 앵커: YOLOv3 COCO 기본 앵커 사용(스케일별 3개), stride로 자동 스케일링
# ============================================
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Tuple

# 기본 Conv-BN-Leaky 블록 정의
class Conv(nn.Module):
    def __init__(self, c1, c2, k=1, s=1, p=None, act=True):
        super().__init__()                                                                                # 부모 초기화
        p = k // 2 if p is None else p                                                                    # 패딩 자동값
        self.conv = nn.Conv2d(c1, c2, k, s, p, bias=False)                                                # 컨볼루션
        self.bn   = nn.BatchNorm2d(c2)                                                                    # 배치정규화
        self.act  = nn.LeakyReLU(0.1, inplace=True) if act else nn.Identity()                             # 활성화
    def forward(self, x):
        return self.act(self.bn(self.conv(x)))                                                            # 순전파

# 다크넷 Residual 블록 (1x1 -> 3x3)
class ResBlock(nn.Module):
    def __init__(self, c):
        super().__init__()                                                                                # 부모 초기화
        self.cv1 = Conv(c, c//2, k=1, s=1)                                                                # 1x1 축소
        self.cv2 = Conv(c//2, c, k=3, s=1)                                                                # 3x3 확장
    def forward(self, x):
        return x + self.cv2(self.cv1(x))                                                                  # 스킵연결

# Darknet-53 백본 구성
class Darknet53(nn.Module):
    def __init__(self):
        super().__init__()                                                                                # 부모 초기화
        self.cv1 = Conv(3, 32, 3, 1)                                                                      # stem conv
        self.cv2 = Conv(32, 64, 3, 2)                                                                     # downsample
        self.res1 = nn.Sequential(*[ResBlock(64) for _ in range(1)])                                      # 1 residual
        self.cv3 = Conv(64, 128, 3, 2)                                                                    # downsample
        self.res2 = nn.Sequential(*[ResBlock(128) for _ in range(2)])                                     # 2 residual
        self.cv4 = Conv(128, 256, 3, 2)                                                                   # downsample
        self.res3 = nn.Sequential(*[ResBlock(256) for _ in range(8)])                                     # 8 residual
        self.cv5 = Conv(256, 512, 3, 2)                                                                   # downsample
        self.res4 = nn.Sequential(*[ResBlock(512) for _ in range(8)])                                     # 8 residual
        self.cv6 = Conv(512, 1024, 3, 2)                                                                  # downsample
        self.res5 = nn.Sequential(*[ResBlock(1024) for _ in range(4)])                                    # 4 residual
    def forward(self, x):
        x = self.cv1(x)                                                                                   # 32x
        x = self.cv2(x) ; x = self.res1(x)                                                                # 64x
        x = self.cv3(x) ; x = self.res2(x)                                                                # 128x
        x = self.cv4(x) ; x3 = self.res3(x)                                                               # 256x (route)
        x = self.cv5(x) ; x2 = self.res4(x)                                                               # 512x (route)
        x = self.cv6(x) ; x1 = self.res5(x)                                                               # 1024x (top)
        return x1, x2, x3                                                                                 # 3 스케일 피처 반환

# YOLOv3 Neck/Head(3-스케일 예측) 구성
class YOLOv3(nn.Module):
    def __init__(self, num_classes=1, anchors=None, img_size=416):
        super().__init__()                                                                                # 부모 초기화
        self.nc = num_classes                                                                             # 클래스 수
        self.na = 3                                                                                        # 스케일당 앵커 수
        # COCO 기본 앵커 (픽셀 기준) - YOLOv3 논문 설정
        self.anchors = anchors or [                                                                       # 3스케일*3개
            [(116,90), (156,198), (373,326)],    # stride 32
            [(30,61),  (62,45),   (59,119)],     # stride 16
            [(10,13),  (16,30),   (33,23)]       # stride 8
        ]
        self.img_size = img_size                                                                          # 입력 해상도
        # 백본 생성
        self.backbone = Darknet53()                                                                        # Darknet-53
        # 헤드 1 (최상단: 1024 → 512 → 1024 → pred)
        self.head1 = nn.Sequential(Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1))                                                 # 출력 512
        self.pred1 = nn.Conv2d(512, self.na*(5+self.nc), 1, 1, 0)                                         # 예측 conv
        # 업샘플 + 합치기 (512→256)
        self.up1   = nn.Upsample(scale_factor=2, mode='nearest')                                          # 2배 업샘플
        self.reduce1 = Conv(512, 256, 1, 1)                                                               # 채널 축소
        # 헤드 2 (합쳐진 피처: 256 + 512 = 768 → ... → pred)
        self.head2 = nn.Sequential(Conv(768, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1))                                                  # 출력 256
        self.pred2 = nn.Conv2d(256, self.na*(5+self.nc), 1, 1, 0)                                         # 예측 conv
        # 또 업샘플 + 합치기 (256→128)
        self.up2   = nn.Upsample(scale_factor=2, mode='nearest')                                          # 2배 업샘플
        self.reduce2 = Conv(256, 128, 1, 1)                                                               # 채널 축소
        # 헤드 3 (합쳐진 피처: 128 + 256 = 384 → ... → pred)
        self.head3 = nn.Sequential(Conv(384, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1))                                                  # 출력 128
        self.pred3 = nn.Conv2d(128, self.na*(5+self.nc), 1, 1, 0)                                         # 예측 conv
        # 스트라이드 값 (입력크기/출력 맵 크기) - forward에서 동적으로 계산
        self.strides = None                                                                                # 나중 설정
    def _make_grid(self, nx, ny, device):
        """셀 좌표 그리드 생성 (각 위치의 (x,y) 인덱스)"""
        yv, xv = torch.meshgrid(torch.arange(ny, device=device), torch.arange(nx, device=device), indexing='ij')  # 그리드
        return xv, yv                                                                                     # x,y 그리드
    def forward(self, x):
        x1, x2, x3 = self.backbone(x)                                                                     # 백본 출력
        p1 = self.head1(x1)                                                                               # 헤드1
        out1 = self.pred1(p1)                                                                             # 예측1 (stride 32)
        u1 = self.up1(self.reduce1(p1))                                                                   # 512→256 업샘플
        f2 = torch.cat([u1, x2], dim=1)                                                                   # 상하위 피처 결합
        p2 = self.head2(f2)                                                                               # 헤드2
        out2 = self.pred2(p2)                                                                             # 예측2 (stride 16)
        u2 = self.up2(self.reduce2(p2))                                                                   # 256→128 업샘플
        f3 = torch.cat([u2, x3], dim=1)                                                                   # 결합
        p3 = self.head3(f3)                                                                               # 헤드3
        out3 = self.pred3(p3)                                                                             # 예측3 (stride 8)
        return [out1, out2, out3]                                                                         # 3 스케일 반환

# 디코더 유틸 (raw pred -> 박스/obj/cls)
def yolo_decode(outputs: List[torch.Tensor], img_size: int, num_classes: int,
                anchors: List[List[Tuple[int,int]]]):
    """3-스케일 raw 출력을 디코딩해 [B, N, 4], [B, N], [B, N, C]로 반환"""
    decoded = []                                                                                          # 스케일별 결과
    device = outputs[0].device                                                                            # 디바이스
    strides = [32, 16, 8]                                                                                 # YOLOv3 표준 스트라이드
    for i, out in enumerate(outputs):                                                                     # 각 스케일 순회
        bs, ch, ny, nx = out.shape                                                                        # 배치/채널/크기
        na = 3                                                                                            # 앵커 수
        no = 5 + num_classes                                                                              # (x,y,w,h,obj+cls)
        out = out.view(bs, na, no, ny, nx).permute(0,1,3,4,2).contiguous()                                # [B,A,ny,nx,no]
        # 앵커/그리드/스트라이드 설정
        stride = strides[i]                                                                               # 현재 스케일 stride
        anc = torch.tensor(anchors[i], device=device).float() / stride                                    # 앵커를 셀 단위로
        xv, yv = torch.meshgrid(torch.arange(nx, device=device), torch.arange(ny, device=device), indexing='xy')  # 그리드
        # 시그모이드/지수 적용하여 실제 좌표로 변환
        x = (out[..., 0].sigmoid() + xv) * stride                                                         # cx 픽셀
        y = (out[..., 1].sigmoid() + yv) * stride                                                         # cy 픽셀
        w = (out[..., 2].exp() * anc[:,0].view(na,1,1)) * stride                                          # w  픽셀
        h = (out[..., 3].exp() * anc[:,1].view(na,1,1)) * stride                                          # h  픽셀
        obj = out[..., 4].sigmoid()                                                                       # objectness
        cls = out[..., 5:].sigmoid()                                                                      # class prob
        # [B, A*ny*nx, ...] 형태로 펴기
        boxes = torch.stack([x - w/2, y - h/2, x + w/2, y + h/2], dim=-1)                                 # x1y1x2y2
        boxes = boxes.view(bs, -1, 4)                                                                     # 박스 전개
        obj   = obj.view(bs, -1)                                                                          # obj 전개
        cls   = cls.view(bs, -1, num_classes)                                                             # cls 전개
        decoded.append((boxes, obj, cls))                                                                 # 누적
    # 스케일 합치기
    boxes = torch.cat([d[0] for d in decoded], dim=1)                                                     # 박스 합침
    obj   = torch.cat([d[1] for d in decoded], dim=1)                                                     # obj 합침
    cls   = torch.cat([d[2] for d in decoded], dim=1)                                                     # cls 합침
    return boxes, obj, cls                                                                                # 최종 반환

In [None]:
# ============================
# [K-means 앵커 계산] - 꿀벌 데이터셋 최적화
# ============================
import numpy as np
from pathlib import Path
from tqdm import tqdm

def iou_width_height(boxes1, boxes2):
    """너비와 높이만으로 IoU 계산"""
    boxes1 = boxes1[:, np.newaxis, :]
    boxes2 = boxes2[np.newaxis, :, :]
    intersection = np.minimum(boxes1, boxes2).prod(axis=2)
    area1 = boxes1.prod(axis=2)
    area2 = boxes2.prod(axis=2)
    union = area1 + area2 - intersection
    return intersection / (union + 1e-16)

def kmeans_anchors(boxes, k=9, max_iter=300):
    """K-means clustering for anchor boxes"""
    n = boxes.shape[0]
    centers = boxes[np.random.choice(n, 1)]

    # K-means++ initialization
    for i in range(1, k):
        dist_to_centers = 1 - iou_width_height(boxes,
centers).max(axis=1)
        probs = dist_to_centers / dist_to_centers.sum()
        cumprobs = probs.cumsum()
        r = np.random.rand()
        centers = np.vstack([centers,
boxes[np.searchsorted(cumprobs, r)]])

    # Main loop
    last_clusters = np.zeros((n,))
    for iteration in range(max_iter):
        ious = iou_width_height(boxes, centers)
        clusters = ious.argmax(axis=1)
        if (clusters == last_clusters).all():
            break
        for i in range(k):
            if np.sum(clusters == i) > 0:
                centers[i] = boxes[clusters == i].mean(axis=0)
        last_clusters = clusters.copy()

    avg_iou = ious.max(axis=1).mean()
    areas = centers[:, 0] * centers[:, 1]
    centers = centers[np.argsort(areas)[::-1]]

    return centers, avg_iou

# 꿀벌 데이터셋에서 박스 크기 수집
print("Loading bee dataset boxes...")
all_boxes = []
for split in ['train', 'val']:
    label_dir = Path(f'/content/bee_project/yolo_labels/{split}')
    for txt_file in tqdm(list(label_dir.glob('*.txt')),
desc=f'Loading {split}'):
        with open(txt_file, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 5:
                    w, h = float(parts[3]), float(parts[4])
                    all_boxes.append([w, h])

boxes = np.array(all_boxes)
print(f"Loaded {len(boxes)} boxes")

# K-means 실행
anchors, avg_iou = kmeans_anchors(boxes, k=9)
print(f"\nAverage IoU: {avg_iou:.4f}")

# 픽셀 값으로 변환 (416x416 기준)
pixel_anchors = (anchors * 416).astype(int)
BEE_ANCHORS = [
    pixel_anchors[0:3].tolist(),  # Large
    pixel_anchors[3:6].tolist(),  # Medium
    pixel_anchors[6:9].tolist()   # Small
]

print("\n꿀벌 최적화 앵커 (416x416):")
print(f"BEE_ANCHORS = {BEE_ANCHORS}")

In [None]:
# ============================================
# [STEP 3-3] 손실/타겟 빌드 + NMS + mAP@0.5 평가 유틸
#  - 앵커-기반 매칭: GT box와 가장 IoU가 높은 앵커(스케일 내) 1개에 할당
#  - 손실: BCE(obj/cls) + MSE(tx,ty,tw,th) (안정성을 위해 가중치 부여)
#  - 평가: NMS 후 VOC 방식으로 AP 계산(클래스 1개)
# ============================================
import math
from torchvision.ops import nms

# IoU 계산 함수 (x1y1x2y2)
def box_iou(box1, box2):
    """box1:[N,4], box2:[M,4] → IoU:[N,M]"""
    area1 = (box1[:,2]-box1[:,0]).clamp(0) * (box1[:,3]-box1[:,1]).clamp(0)                               # 면적1
    area2 = (box2[:,2]-box2[:,0]).clamp(0) * (box2[:,3]-box2[:,1]).clamp(0)                               # 면적2
    lt = torch.max(box1[:,None,:2], box2[:,:2])                                                           # 좌상단
    rb = torch.min(box1[:,None,2:], box2[:,2:])                                                           # 우하단
    wh = (rb - lt).clamp(min=0)                                                                           # 교집합 너비/높이
    inter = wh[:,:,0] * wh[:,:,1]                                                                         # 교집합 면적
    return inter / (area1[:,None] + area2 - inter + 1e-6)                                                 # IoU 반환

# 앵커-기반 타깃 빌더
class TargetBuilder:
    """GT를 3개 스케일/3앵커 그리드에 할당하는 도우미"""
    def __init__(self, img_size, num_classes, anchors):
        self.img_size = img_size                                                                          # 입력 해상도
        self.nc = num_classes                                                                             # 클래스 수
        self.anchors = anchors                                                                            # 앵커 목록
        self.strides = [32, 16, 8]                                                                        # 스트라이드
    def build(self, targets, outputs):
        """
        targets:[M,6](b, cx,cy,w,h,cls) 0~1 정규화, outputs: raw 출력 리스트
        반환: (obj_t, cls_t, box_t, indices, anchors_scaled)
        """
        na = 3                                                                                             # 스케일당 앵커 수
        obj_t, cls_t, box_t, idxs, anchs = [], [], [], [], []                                              # 빈 리스트
        device = outputs[0].device                                                                         # 디바이스
        for i, out in enumerate(outputs):                                                                  # 스케일 순회
            bs, ch, ny, nx = out.shape                                                                     # 맵 크기
            stride = self.strides[i]                                                                       # 현재 스트라이드
            scale = torch.tensor(self.anchors[i], device=device).float() / stride                          # 앵커(셀단위)
            # 빈 타깃 텐서 생성
            obj_map = torch.zeros(bs, na, ny, nx, device=device)                                           # objectness
            cls_map = torch.zeros(bs, na, ny, nx, self.nc, device=device)                                  # 클래스원핫
            box_map = torch.zeros(bs, na, ny, nx, 4, device=device)                                        # (tx,ty,tw,th)
            # 현재 스케일에 들어올 타깃만 처리
            if targets.numel() > 0:                                                                        # 타깃이 있다면
                t = targets.clone()                                                                        # 복사
                # 픽셀 좌표로 변환
                gx = t[:,1] * nx                                                                           # 셀 좌표 x
                gy = t[:,2] * ny                                                                           # 셀 좌표 y
                gw =  t[:,3] * self.img_size / stride                                                      # 셀 스케일 w
                gh =  t[:,4] * self.img_size / stride                                                      # 셀 스케일 h
                gi = gx.long().clamp(0, nx-1)                                                              # 셀 인덱스 i
                gj = gy.long().clamp(0, ny-1)                                                              # 셀 인덱스 j
                # 앵커 매칭(wh IoU 기반) - 가장 유사한 앵커 하나 선택
                gt_wh = torch.stack([gw, gh], dim=1)                                                       # [M,2]
                anc_wh = scale                                                                             # [3,2]
                # IoU를 너비/높이만으로 근사 계산
                inter = torch.min(gt_wh[:,None,:], anc_wh[None,:,:]).prod(2)                               # 교집합(근사)
                union = (gt_wh[:,None,:].prod(2) + anc_wh[None,:,:].prod(2) - inter + 1e-6)               # 합집합
                ious = inter / union                                                                       # IoU 근사
                best = ious.argmax(1)                                                                      # 최적 앵커 id
                b = t[:,0].long()                                                                          # 배치 인덱스
                c = t[:,5].long()                                                                          # 클래스 id
                # 지도 값 계산 (tx,ty,tw,th)
                tx = gx - gi.float()                                                                       # 셀 내 offset x
                ty = gy - gj.float()                                                                       # 셀 내 offset y
                tw = (gw / scale[best,0]).clamp(min=1e-6).log()                                            # 로그폭
                th = (gh / scale[best,1]).clamp(min=1e-6).log()                                            # 로그높이
                # 맵에 쓰기
                obj_map[b, best, gj, gi] = 1.0                                                             # objectness=1
                cls_map[b, best, gj, gi, c] = 1.0                                                          # 원핫 클래스
                box_map[b, best, gj, gi] = torch.stack([tx,ty,tw,th], dim=1)                               # 박스 타겟
            obj_t.append(obj_map)                                                                          # 누적
            cls_t.append(cls_map)                                                                          # 누적
            box_t.append(box_map)                                                                          # 누적
            idxs.append((ny, nx))                                                                          # 크기 정보 저장
            anchs.append(scale)                                                                            # 앵커 저장
        return obj_t, cls_t, box_t, idxs, anchs                                                            # 결과 반환

# 손실 함수 (YOLOv3 스타일의 간단 가중합)
class YoloLoss(nn.Module):
    def __init__(self, img_size, num_classes, anchors,
                 lambda_box=0.05, lambda_obj=1.0, lambda_cls=0.5,
                 neg_obj_weight=0.05):  # 음성 오브젝트 가중치(작게)
        super().__init__()                                                  # 부모 초기화
        self.bce = nn.BCEWithLogitsLoss(reduction='none')                   # 맵 단위 BCE
        self.mse = nn.MSELoss(reduction='none')                             # 맵 단위 MSE
        self.builder = TargetBuilder(img_size, num_classes, anchors)        # 타깃 빌더
        self.lambda_box = lambda_box                                        # 박스 가중치
        self.lambda_obj = lambda_obj                                        # obj 가중치
        self.lambda_cls = lambda_cls                                        # cls 가중치
        self.neg_obj_weight = neg_obj_weight                                # 음성 obj 가중치
        self.num_classes = num_classes                                      # 클래스 수

    def forward(self, outputs, targets):
        """
        outputs: 모델의 raw 출력 리스트(3스케일)
        targets: [M,6] = (b, cx,cy,w,h,cls) (0~1 정규화)
        """
        # 타깃 맵 생성
        obj_t, cls_t, box_t, _, _ = self.builder.build(targets, outputs)    # 각 스케일별 타깃

        device = outputs[0].device                                          # 디바이스
        l_box = torch.tensor(0., device=device)                             # 박스 손실 누적
        l_obj = torch.tensor(0., device=device)                             # obj 손실 누적
        l_cls = torch.tensor(0., device=device)                             # cls 손실 누적

        for i, out in enumerate(outputs):                                    # 스케일 순회
            bs, ch, ny, nx = out.shape                                      # 크기 정보
            na = 3                                                          # 앵커 수
            no = 5 + self.num_classes                                       # 출력 채널 수
            out = out.view(bs, na, no, ny, nx).permute(0,1,3,4,2).contiguous()  # [B,A,ny,nx,no]

            # 로짓 분해
            px = out[..., 0]                                                # tx 로짓
            py = out[..., 1]                                                # ty 로짓
            pw = out[..., 2]                                                # tw 로짓 (로그폭)
            ph = out[..., 3]                                                # th 로짓 (로그높이)
            pobj = out[..., 4]                                              # obj 로짓
            pcls = out[..., 5:] if self.num_classes > 0 else None           # cls 로짓

            # 타깃 맵
            tbox = box_t[i]                                                 # [B,A,ny,nx,4] (tx,ty,tw,th)
            tobj = obj_t[i]                                                 # [B,A,ny,nx]
            tcls = cls_t[i]                                                 # [B,A,ny,nx,C]

            # 양성/음성 마스크
            pos = tobj.bool()                                               # GT가 할당된 위치
            # 같은 셀(같은 B, y, x)에 GT가 하나라도 있으면 그 셀의 모든 앵커를 음성에서 제외(ignore)
            any_pos_cell = pos.any(dim=1, keepdim=True)                     # [B,1,ny,nx]
            neg_mask = (~pos) & (~any_pos_cell.expand_as(pos))              # 진짜 음성만

            # --- 박스 손실: 양성에서만 (xy는 sigmoid 후 MSE, wh는 로짓-MSE) ---
            if pos.any():
                l_box = l_box + self.mse(torch.sigmoid(px)[pos], tbox[...,0][pos]).mean()
                l_box = l_box + self.mse(torch.sigmoid(py)[pos], tbox[...,1][pos]).mean()
                l_box = l_box + self.mse(pw[pos],               tbox[...,2][pos]).mean()
                l_box = l_box + self.mse(ph[pos],               tbox[...,3][pos]).mean()

            # --- 오브젝트니스 손실: 양성 + (작은 가중)음성 ---
            obj_loss_map = self.bce(pobj, tobj)                            # 위치별 BCE
            if pos.any():
                l_obj = l_obj + obj_loss_map[pos].mean()                   # 양성 부분
            if neg_mask.any():
                l_obj = l_obj + self.neg_obj_weight * obj_loss_map[neg_mask].mean()  # 음성 약하게

            # --- 클래스 손실: 양성에서만 ---
            if self.num_classes > 0 and pos.any():
                l_cls = l_cls + self.bce(pcls[pos], tcls[pos]).mean()

        # 가중합 손실
        loss = self.lambda_box*l_box + self.lambda_obj*l_obj + self.lambda_cls*l_cls
        return loss, (l_box.detach().item(), l_obj.detach().item(), l_cls.detach().item())
# NMS + 스코어 기반 박스 필터링
def postprocess(outputs, conf_thr=0.25, iou_thr=0.5, img_size=IMG_SIZE, num_classes=1, anchors=None):
    """모델 출력 → 최종 박스/점수/클래스 (이미지별 리스트)"""
    boxes, obj, cls = yolo_decode(outputs, img_size, num_classes, anchors)                                 # 디코드
    bs = boxes.size(0)                                                                                      # 배치 크기
    results = []                                                                                            # 결과 리스트
    for b in range(bs):                                                                                     # 배치 순회
        scores = obj[b]                                                                                     # 오브젝트 확률
        if num_classes>0:                                                                                   # 클래스 있으면
            cls_scores, cls_ids = cls[b].max(dim=1)                                                         # 최고 클래스
            scores = scores * cls_scores                                                                    # 최종 스코어
        keep = scores > conf_thr                                                                            # 임계값 필터
        bxs = boxes[b][keep]                                                                                # 박스 필터
        scs = scores[keep]                                                                                  # 점수 필터
        cids = cls_ids[keep] if num_classes>0 else torch.zeros_like(scs, dtype=torch.long)                 # 클래스 id
        if bxs.numel()==0:                                                                                  # 없으면
            results.append((torch.zeros((0,4)), torch.zeros((0,)), torch.zeros((0,), dtype=torch.long)))    # 빈 결과
            continue                                                                                        # 다음으로
        keep_idx = nms(bxs, scs, iou_thr)                                                                   # NMS 수행
        results.append((bxs[keep_idx], scs[keep_idx], cids[keep_idx]))                                      # 결과 저장
    return results                                                                                           # 리스트 반환

# 간단 mAP@0.5 (VOC 방식, 단일 클래스)
def eval_map50(model, dataloader, device, conf_thr=0.25, iou_thr=0.5, anchors=None):
    """단일 클래스 기준 mAP@0.5 계산 (VOC 곡선 적분)"""
    model.eval()                                                                                            # 평가 모드
    all_preds, all_gts = [], {}                                                                             # 예측/GT 저장
    with torch.no_grad():                                                                                   # 그라드 끄기
        for imgs, targets, names in dataloader:                                                             # 배치 순회
            imgs = imgs.to(device, non_blocking=True)                                                       # 이미지 GPU
            outputs = model(imgs)                                                                           # 모델 추론
            batch_res = postprocess(outputs, conf_thr, iou_thr, IMG_SIZE, 1, model.anchors)                # 후처리
            # GT를 이미지별로 수집 (픽셀 좌표 필요)
            for bi, name in enumerate(names):                                                               # 이미지별
                g = targets[targets[:,0]==bi] if targets.numel()>0 else torch.zeros((0,6))                  # 해당 GT
                g = g[:,1:5] if g.numel()>0 else torch.zeros((0,4))                                         # [cx,cy,w,h]
                # 정규화 → 픽셀 박스
                if g.numel()>0:
                    cx = g[:,0]*IMG_SIZE ; cy=g[:,1]*IMG_SIZE ; w=g[:,2]*IMG_SIZE ; h=g[:,3]*IMG_SIZE       # 픽셀
                    gx1 = cx - w/2 ; gy1 = cy - h/2 ; gx2 = cx + w/2 ; gy2 = cy + h/2                       # x1y1x2y2
                    gxyxy = torch.stack([gx1,gy1,gx2,gy2], dim=1)                                           # GT 박스
                else:
                    gxyxy = torch.zeros((0,4))                                                              # 빈 GT
                all_gts[name] = {'boxes': gxyxy.cpu(), 'detected': np.zeros(len(gxyxy), dtype=bool)}        # GT 저장
            # 예측 수집 (이미지명 단위)
            for (bxs, scs, cids), name in zip(batch_res, names):                                            # 예측/이름
                for i in range(len(bxs)):                                                                   # 각 박스
                    all_preds.append((name, scs[i].item(), bxs[i].cpu().numpy()))                           # (img,score,box)
    # 스코어 내림차순 정렬
    all_preds.sort(key=lambda x: x[1], reverse=True)                                                        # 정렬
    tp, fp = [], []                                                                                         # TP/FP 기록
    for name, score, pbox in all_preds:                                                                     # 예측 순회
        gt = all_gts[name]['boxes']                                                                         # GT 박스들
        if gt.numel()==0:                                                                                   # GT 없음
            tp.append(0) ; fp.append(1)                                                                     # 무조건 FP
            continue                                                                                        # 다음
        ious = box_iou(torch.tensor(pbox[None,:]), gt).squeeze(0).numpy()                                   # IoU 계산
        m = ious.argmax() # Fixed typo from 'm = ious.argmax()' to 'm = ious.argmax()'
        if ious[m] >= 0.5 and not all_gts[name]['detected'][m]:                                             # 매칭 가능
            tp.append(1) ; fp.append(0)                                                                     # TP 기록
            all_gts[name]['detected'][m] = True                                                             # 사용 처리
        else:
            tp.append(0) ; fp.append(1)                                                                     # FP 기록
    # 누적 TPs/FPs → P/R 곡선
    tp = np.cumsum(tp)                                                                                      # 누적 TP
    fp = np.cumsum(fp)                                                                                      # 누적 FP
    eps = 1e-9                                                                                              # 0나눗셈 방지
    recalls = tp / (sum(len(v['boxes']) for v in all_gts.values()) + eps)                                   # 재현율
    precisions = tp / (tp + fp + eps)                                                                       # 정밀도
    # AP 계산(단순 적분)
    def compute_ap(rec, prec):                                                                              # AP 함수
        mrec = np.concatenate(([0.0], rec, [1.0]))                                                          # 경계 추가
        mpre = np.concatenate(([0.0], prec, [0.0]))                                                         # 경계 추가
        for i in range(mpre.size-1, 0, -1):                                                                 # 단조감소 보정
            mpre[i-1] = np.maximum(mpre[i-1], mpre[i])                                                      # 보정
        idx = np.where(mrec[1:] != mrec[:-1])[0]                                                            # 변화 구간
        ap = np.sum((mrec[idx+1]-mrec[idx]) * mpre[idx+1])                                                  # 면적 합
        return ap                                                                                            # AP 반환
    ap50 = compute_ap(recalls, precisions)                                                                  # AP@0.5
    return float(ap50), float(precisions[-1] if precisions.size>0 else 0.0), float(recalls[-1] if recalls.size>0 else 0.0)  # (mAP, P, R)

# ======================================================
# 업그레이드 탐지 평가: AP50, AP50-95, P/R, mean IoU(TP),
#                       속도 분해(전처리/추론/NMS)
# ======================================================

import time, numpy as np, torch
from torchvision.ops import nms


def compute_ap(rec, prec):
    mrec = np.concatenate(([0.0], rec, [1.0]))
    mpre = np.concatenate(([0.0], prec, [0.0]))
    for i in range(mpre.size-1, 0, -1):
        mpre[i-1] = np.maximum(mpre[i-1], mpre[i])
    idx = np.where(mrec[1:] != mrec[:-1])[0]
    return np.sum((mrec[idx+1]-mrec[idx]) * mpre[idx+1])

def _gather_gt_pred(model, dataloader, device, conf_thr, iou_thr):
    model.eval()
    all_preds = []  # (name, score, box[x1y1x2y2])
    gts = {}        # name -> {boxes:Tensor[N,4], detected:bool[N]}
    t_mean_iou = [] # TPs의 IoU 저장
    t_pre_ms, t_inf_ms, t_nms_ms = [], [], []

    with torch.no_grad():
        for imgs, targets, names in dataloader:
            # --- 전처리 시간: 여기선 거의 없음(이미 텐서) ---
            t0 = time.time()
            imgs = imgs.to(device, non_blocking=True)
            torch.cuda.synchronize() if device.type=='cuda' else None
            t1 = time.time()

            # --- 모델 추론 ---
            out = model(imgs)
            torch.cuda.synchronize() if device.type=='cuda' else None
            t2 = time.time()

            # --- 디코드 + NMS ---
            boxes, obj, cls = yolo_decode(out, IMG_SIZE, 1, model.anchors)
            batch_res = []
            for b in range(imgs.size(0)):
                scores = obj[b]
                keep = scores > conf_thr
                bxs, scs = boxes[b][keep], scores[keep]
                if bxs.numel()==0:
                    batch_res.append((torch.zeros((0,4)), torch.zeros((0,))))
                    continue
                keep_idx = nms(bxs, scs, iou_thr)
                batch_res.append((bxs[keep_idx], scs[keep_idx]))
            torch.cuda.synchronize() if device.type=='cuda' else None
            t3 = time.time()

            t_pre_ms.append((t1 - t0) * 1000.0)
            t_inf_ms.append((t2 - t1) * 1000.0)
            t_nms_ms.append((t3 - t2) * 1000.0)

            # --- GT(픽셀좌표) 누적 ---
            for bi, name in enumerate(names):
                g = targets[targets[:,0]==bi] if targets.numel()>0 else torch.zeros((0,6))
                g = g[:,1:5] if g.numel()>0 else torch.zeros((0,4))
                if g.numel()>0:
                    cx = g[:,0]*IMG_SIZE; cy=g[:,1]*IMG_SIZE; w=g[:,2]*IMG_SIZE; h=g[:,3]*IMG_SIZE
                    gx1, gy1, gx2, gy2 = cx-w/2, cy-h/2, cx+w/2, cy+h/2
                    gxyxy = torch.stack([gx1,gy1,gx2,gy2], dim=1).cpu()
                else:
                    gxyxy = torch.zeros((0,4))
                gts[name] = {'boxes': gxyxy, 'detected': np.zeros(len(gxyxy), dtype=bool)}

            # --- 예측 누적 + TP IoU 기록 ---
            for (bxs, scs), name in zip(batch_res, names):
                bxs = bxs.cpu()
                for i in range(len(bxs)):
                    all_preds.append((name, float(scs[i].item()), bxs[i].numpy()))

                # TP IoU(참고용): 가장 높은 IoU와 매칭되면 기록
                if len(bxs) and len(gts[name]['boxes']):
                    ious = box_iou(bxs, gts[name]['boxes']).numpy()
                    # Ensure ious is not empty before taking max
                    if ious.size > 0:
                        max_per_pred = ious.max(axis=1)
                        if max_per_pred.size:
                            # 실제 TP인지 여부는 AP 계산 시 확정되므로, 여기선 최대 IoU 분포만 참고(근사)
                            t_mean_iou.extend(max_per_pred.tolist())
                        else: # Handle case where max_per_pred is empty
                             t_mean_iou.extend([0.0] * len(bxs)) # Append 0.0 for each predicted box

    return all_preds, gts, t_mean_iou, t_pre_ms, t_inf_ms, t_nms_ms

def _ap_at_iou_threshold(all_preds, gts, iou_thr):
    all_preds = sorted(all_preds, key=lambda x: x[1], reverse=True)
    tp = [] # TP 기록
    fp = [] # FP 기록
    local_gts = {k: {'boxes': v['boxes'], 'detected': v['detected'].copy()} for k,v in gts.items()}
    for name, score, pbox in all_preds:
        gt = local_gts[name]['boxes']
        if gt.numel()==0: # GT 없음
            tp.append(0) ; fp.append(1); continue
        ious = box_iou(torch.tensor(pbox[None,:]), gt).squeeze(0).numpy()
        # Ensure ious is not empty before taking argmax
        if ious.size > 0:
            m = ious.argmax()
            if ious[m] >= iou_thr and not local_gts[name]['detected'][m]: # 매칭 가능
                tp.append(1) ; fp.append(0); local_gts[name]['detected'][m] = True # TP 기록
            else:
                tp.append(0) ; fp.append(1) # FP 기록
        else: # No IoUs calculated (e.g., pred box is invalid)
            tp.append(0) ; fp.append(1) # Treat as FP


    # 누적 TPs/FPs
    tp = np.cumsum(tp)
    fp = np.cumsum(fp)

    eps = 1e-9
    # Handle cases where there are no GT boxes or no predictions
    total_gt = sum(len(v['boxes']) for v in gts.values())
    recalls = tp / (total_gt + eps) if total_gt > 0 else np.zeros_like(tp)
    precisions = tp / (tp + fp + eps) if (tp + fp).sum() > 0 else np.zeros_like(tp)


    # AP 계산
    ap = compute_ap(recalls, precisions)

    # Handle cases where precision or recall arrays might be empty
    final_precision = float(precisions[-1] if precisions.size > 0 else 0.0)
    final_recall = float(recalls[-1] if recalls.size > 0 else 0.0)


    return ap, final_precision, final_recall


def eval_detection(model, dataloader, device, conf_thr=0.25, iou_thr=0.5):
    # 1) 예측/GT 수집 + 속도
    preds, gts, tp_ious, pre_ms, inf_ms, nms_ms = _gather_gt_pred(model, dataloader, device, conf_thr, iou_thr)

    # 2) AP@0.5
    ap50, p50, r50 = _ap_at_iou_threshold(preds, gts, 0.5)

    # 3) AP@0.5:0.95
    # Only calculate AP for IoU thresholds if there are any ground truth boxes
    total_gt_boxes = sum(len(v['boxes']) for v in gts.values())
    if total_gt_boxes > 0:
        ious = np.arange(0.5, 0.95 + 1e-9, 0.05)
        ap_list = [ _ap_at_iou_threshold(preds, gts, t)[0] for t in ious ]
        ap5095 = float(np.mean(ap_list))
    else:
        ap5095 = 0.0 # If no GT boxes, AP@0.5:0.95 is 0

    # 4) mean IoU of TPs (근사: 예측-최대 IoU 분포 평균)
    mean_iou_tp = float(np.mean(tp_ious)) if len(tp_ious) else 0.0

    # 5) 속도
    pre_ms  = float(np.mean(pre_ms)) if pre_ms else 0.0
    inf_ms  = float(np.mean(inf_ms)) if inf_ms else 0.0
    nms_ms  = float(np.mean(nms_ms)) if nms_ms else 0.0
    total_ms= pre_ms + inf_ms + nms_ms
    # Avoid division by zero if total_ms is zero
    fps     = 1000.0 / total_ms if total_ms > 0 else 0.0


    return {
        'AP50': float(ap50),
        'AP50_95': float(ap5095),
        'Precision@0.5': float(p50),
        'Recall@0.5': float(r50),
        'mean_IoU_TP': float(mean_iou_tp),
        'pre_ms': pre_ms, 'infer_ms': inf_ms, 'nms_ms': nms_ms,
        'latency_ms': total_ms, 'FPS': fps
    }

In [None]:
# ============================
# [수정된 YOLOv3 손실함수] - 논문 가중치 적용
# ============================

class YoloLossFixed(nn.Module):
    """YOLOv3 손실함수 - 논문 가중치 수정 버전"""
    def __init__(self, img_size, num_classes, anchors,
                lambda_box=5.0,      # 수정: 0.05 → 5.0
                lambda_obj=1.0,
                lambda_cls=1.0,      # 수정: 0.5 → 1.0
                neg_obj_weight=0.5): # 수정: 0.05 → 0.5
        super().__init__()
        self.bce = nn.BCEWithLogitsLoss(reduction='none')
        self.mse = nn.MSELoss(reduction='none')
        self.builder = TargetBuilder(img_size, num_classes,anchors)
        self.lambda_box = lambda_box
        self.lambda_obj = lambda_obj
        self.lambda_cls = lambda_cls
        self.neg_obj_weight = neg_obj_weight
        self.num_classes = num_classes

    def forward(self, outputs, targets):
        obj_t, cls_t, box_t, _, _ = self.builder.build(targets,outputs)

        device = outputs[0].device
        l_box = torch.tensor(0., device=device)
        l_obj = torch.tensor(0., device=device)
        l_cls = torch.tensor(0., device=device)

        for i, out in enumerate(outputs):
            bs, ch, ny, nx = out.shape
            na = 3
            no = 5 + self.num_classes
            out = out.view(bs, na, no, ny,nx).permute(0,1,3,4,2).contiguous()

            px = out[..., 0]
            py = out[..., 1]
            pw = out[..., 2]
            ph = out[..., 3]
            pobj = out[..., 4]
            pcls = out[..., 5:] if self.num_classes > 0 else None

            tbox = box_t[i]
            tobj = obj_t[i]
            tcls = cls_t[i]

            pos = tobj.bool()
            neg = ~pos

            # 박스 손실 (가중치 5.0)
            if pos.any():
                l_box = l_box + self.mse(torch.sigmoid(px)[pos],tbox[...,0][pos]).mean()
                l_box = l_box + self.mse(torch.sigmoid(py)[pos],tbox[...,1][pos]).mean()
                l_box = l_box + self.mse(pw[pos],tbox[...,2][pos]).mean()
                l_box = l_box + self.mse(ph[pos],tbox[...,3][pos]).mean()

            # Objectness 손실 (음성 가중치 0.5)
            obj_loss_map = self.bce(pobj, tobj)
            if pos.any():
                l_obj = l_obj + obj_loss_map[pos].mean()
            if neg.any():
                l_obj = l_obj + self.neg_obj_weight *obj_loss_map[neg].mean()

            # 클래스 손실
            if self.num_classes > 0 and pos.any():
                l_cls = l_cls + self.bce(pcls[pos],tcls[pos]).mean()

        loss = self.lambda_box*l_box + self.lambda_obj*l_obj +self.lambda_cls*l_cls
        return loss, (l_box.detach().item(),l_obj.detach().item(), l_cls.detach().item())

In [None]:
# ===== [필수] 환경/하이퍼 설정 =====
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

NC = 1               # 클래스 수 (벌 데이터면 1 또는 필요한 수로 설정)
IMG_SIZE = 416       # 입력 해상도 (당신 모델과 데이터 파이프라인에 맞추세요)
WEIGHTS = None       # "/content/bee_project/weights/best.pt" 등 학습 가중치 경로 (없으면 None)

# ===== [중요] 모델 인스턴스 생성 =====
# 위에 올려준 YOLOv3 / Darknet53 클래스 정의 셀을 먼저 실행한 다음 이 셀을 실행하세요.
model = YOLOv3(num_classes=NC, img_size=IMG_SIZE).to(device)

# (선택) 학습/사전학습 가중치 로드
if WEIGHTS is not None:
    ckpt = torch.load(WEIGHTS, map_location=device)
    # ckpt가 {"model_state_dict": ...} 구조일 수도, 바로 state_dict일 수도 있습니다.
    state_dict = ckpt.get("model_state_dict", ckpt)
    model.load_state_dict(state_dict, strict=False)
    print(f"Loaded weights from: {WEIGHTS}")

model.eval()

# ===== [점검 1] 더미 입력으로 포워드 (shape 검증) =====
with torch.no_grad():
    x = torch.randn(1, 3, IMG_SIZE, IMG_SIZE, device=device)    # 배치1, 3채널, 416x416
    outs = model(x)  # [out1, out2, out3]

for i, o in enumerate(outs, 1):
    print(f"head{i}.pred shape = {tuple(o.shape)}")
    # 기대 모양: [B, A*(5+NC), S, S]  (A=3)
    # 예: NC=1이면 채널 = 3*(5+1)=18


In [None]:
def assert_like_paper(model, img_size=416, nc=1, na=3):
    model.eval()
    x = torch.randn(1, 3, img_size, img_size).to(next(model.parameters()).device)
    with torch.no_grad():
        outs = model(x)
    strides = [32,16,8]
    expected_sizes = [img_size//s for s in strides]  # [13,26,52] for 416

    for i, (o, S) in enumerate(zip(outs, expected_sizes), 1):
        b, c, h, w = o.shape
        assert h == S and w == S, f"head{i} size mismatch: got {(h,w)}, want {(S,S)}"
        assert c == na*(5+nc),    f"head{i} ch mismatch: got {c}, want {na*(5+nc)}"
        print(f"[OK] head{i}: shape={tuple(o.shape)} (S={S}, C={na*(5+nc)})")

assert_like_paper(model, img_size=416, nc=1, na=3)


In [None]:
# 경로 체크, 예측 수 확인
# ===== 빠른 무결성 점검 =====
print('TRAIN_IMG_DIR =', TRAIN_IMG_DIR)
print('VAL_IMG_DIR   =', VAL_IMG_DIR)
print('TRAIN_LBL_DIR =', TRAIN_LBL_DIR)
print('VAL_LBL_DIR   =', VAL_LBL_DIR)

# ============================================
# [YAML → 런타임 변수 바인딩]  (기존 YAML 키 그대로 사용)
#  - TRAIN_IMG_DIR / VAL_IMG_DIR / TRAIN_LBL_DIR / VAL_LBL_DIR 등 설정
#  - eval_conf, IOU_THR, IMG_SIZE, NC, device 설정
# ============================================
import os, yaml, torch
from pathlib import Path

# 1) YAML 로드 (경로는 네가 저장한 위치로)
CONFIG_PATH = Path('/content/bee_project/project_config.yaml')  # 필요시 수정
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
    _cfg = yaml.safe_load(f)

dcfg = _cfg['detection']
pcfg = _cfg['paths']

# 2) 경로/하이퍼 파라미터를 런타임 변수로 바인딩
TRAIN_IMG_DIR = dcfg['train_images']
VAL_IMG_DIR   = dcfg['val_images']
TRAIN_LBL_DIR = dcfg['train_labels']
VAL_LBL_DIR   = dcfg['val_labels']

NC       = int(dcfg.get('num_classes', 1))
IMG_SIZE = int(dcfg.get('img_size', 416))
eval_conf = float(dcfg.get('conf_threshold', 0.25))
IOU_THR   = float(dcfg.get('iou_threshold', 0.5))   # 기존 YAML에 0.15였으면 너무 낮음 → 0.45~0.6 권장

# device 설정 (yaml의 device는 'cuda'/'cpu'/'auto'가 있을 수 있음)
dev_pref = _cfg.get('device', 'cuda' if torch.cuda.is_available() else 'cpu')
if dev_pref == 'auto':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
    device = torch.device('cuda' if (dev_pref == 'cuda' and torch.cuda.is_available()) else 'cpu')

# 3) 경로 존재 체크 및 경고
for p in [TRAIN_IMG_DIR, VAL_IMG_DIR, TRAIN_LBL_DIR, VAL_LBL_DIR]:
    if not os.path.isdir(p):
        print(f"[WARN] 경로가 존재하지 않습니다: {p}")

# 4) 현재 설정 요약
print(f"[CFG] device={device}, NC={NC}, IMG_SIZE={IMG_SIZE}, conf={eval_conf}, iou={IOU_THR}")
print(f"[PATH] TRAIN_IMG_DIR={TRAIN_IMG_DIR}")
print(f"[PATH] VAL_IMG_DIR  ={VAL_IMG_DIR}")
print(f"[PATH] TRAIN_LBL_DIR={TRAIN_LBL_DIR}")
print(f"[PATH] VAL_LBL_DIR  ={VAL_LBL_DIR}")

# 5) (옵션) model이 아직 없으면 최소한 인스턴스만 생성해두기
try:
    model
except NameError:
    try:
        model = YOLOv3(num_classes=NC, img_size=IMG_SIZE).to(device)
        model.eval()
        print("[INFO] YOLOv3 model was instantiated (no weights loaded).")
    except Exception as e:
        print(f"[WARN] 모델 인스턴스 생성 실패: {e}\n"
              f"       이미 다른 셀에서 만들었다면 이 경고는 무시해도 됩니다.")



from pathlib import Path
def count_yolo_labels(lbl_dir):
    n_files=0; n_boxes=0
    for p in Path(lbl_dir).glob('*.txt'):
        n_files += 1
        with open(p, 'r') as f:
            lines = [ln for ln in f.read().strip().splitlines() if ln.strip()]
            n_boxes += len(lines)
    print(f'[VAL] label files={n_files}, total boxes={n_boxes}')
count_yolo_labels(VAL_LBL_DIR)

# 샘플 배치 예측 수(초기 conf) 확인
eval_conf = 0.25
model.eval()
with torch.no_grad():
    imgs, targets, names = next(iter(val_loader))
    outs = model(imgs.to(device))
    res  = postprocess(outs, eval_conf, IOU_THR, IMG_SIZE, 1, model.anchors)
    print(f'sample detections per image (conf={eval_conf}):', [len(r[0]) for r in res])



In [None]:
# ============================
# [수정된 모델/손실함수로 교체]
# ============================

# 꿀벌 최적화 앵커 (위에서 계산한 값 사용하거나 아래 기본값 사용)
if 'BEE_ANCHORS' not in globals():
    BEE_ANCHORS = [
        [(116, 90), (156, 198), (373, 326)],  # Large
        [(30, 61), (62, 45), (59, 119)],      # Medium
        [(10, 13), (16, 30), (33, 23)]        # Small
    ]

# 모델 재생성 (꿀벌 앵커 적용)
device = torch.device('cuda' if torch.cuda.is_available() else'cpu')
model = YOLOv3(num_classes=1, anchors=BEE_ANCHORS,img_size=416).to(device)

# 수정된 손실함수 사용
criterion = YoloLossFixed(
    img_size=416,
    num_classes=1,
    anchors=BEE_ANCHORS,
    lambda_box=5.0,      # 논문값
    lambda_obj=1.0,
    lambda_cls=1.0,
    neg_obj_weight=0.5   # 논문값
).to(device)

print("✅ 모델 수정 완료:")
print("- 꿀벌 최적화 앵커 적용")
print("- 손실 가중치 논문값으로 수정")
print(f"- lambda_box: 0.05 → 5.0")
print(f"- neg_obj_weight: 0.05 → 0.5")


# ============================
# [개선된 학습 하이퍼파라미터]
# ============================

# 학습률 스케줄러 변경 (Cosine → OneCycleLR)
opt = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.937,
nesterov=True, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    opt,
    max_lr=0.01,
    epochs=50,
    steps_per_epoch=len(train_loader),
    pct_start=0.1,  # warmup 10%
    anneal_strategy='cos'
)

# 멀티스케일 범위 확대
MS_SCALES = list(range(320, 641, 32))  # [320, 352, ..., 640]

print("✅ 학습 설정 개선 완료")

In [None]:
# eval_detection 없이 간단한 추론 테스트
print("=== 모델 추론 테스트 ===")
model.eval()
with torch.no_grad():
    for i, (test_imgs, test_targets, test_names) in enumerate(val_loader):
        if i >= 1:  # 첫 배치만
            break
        print(f"Batch {i}: imgs.shape={test_imgs.shape}")
        test_imgs = test_imgs.to(device)

        # 모델 추론만
        test_outputs = model(test_imgs)
        print(f"Model output OK: {len(test_outputs)} scales")

        # yolo_decode 테스트
        try:
            boxes, obj, cls = yolo_decode(test_outputs, IMG_SIZE, 1, model.anchors)
            print(f"yolo_decode OK: boxes.shape={boxes.shape}")
        except Exception as e:
            print(f"yolo_decode ERROR: {e}")
            break

        # postprocess 테스트
        try:
            results = postprocess(test_outputs, 0.25, 0.5, IMG_SIZE, 1, model.anchors)
            print(f"postprocess OK: {len(results)} batch results")
        except Exception as e:
            print(f"postprocess ERROR: {e}")
            break

print("=== 추론 테스트 완료 ===")

In [None]:
# ============================================
# YOLOv3 학습 루프 (validation 문제 해결됨)
# ============================================
import torch
from torch.cuda.amp import autocast, GradScaler
from torch.optim.lr_scheduler import CosineAnnealingLR
from pathlib import Path

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_SIZE = 416

# 모델 및 손실함수 초기화
model = YOLOv3(num_classes=1, img_size=IMG_SIZE).to(device)
criterion = YoloLoss(img_size=IMG_SIZE, num_classes=1, anchors=model.anchors).to(device)

# 옵티마이저 설정
opt = torch.optim.SGD(
    model.parameters(),
    lr=DET.get('learning_rate', 1e-3),
    momentum=0.937,
    nesterov=True,
    weight_decay=DET.get('weight_decay', 5e-4)
)

# 스케줄러 및 AMP
scheduler = CosineAnnealingLR(opt, T_max=int(DET.get('epochs', 20)))
scaler = GradScaler(enabled=bool(DET.get('amp', True)))

# 배치 타깃 빌더
def build_batch_targets(targets_raw, batch_size):
    return targets_raw.to(device, non_blocking=True)

# 학습 파라미터
EPOCHS = 50
CONF_THR = 0.25
IOU_THR = 0.5
SAVE_DIR = Path(CFG.get('paths', {}).get('weights', '/content/bee_project/outputs/weights'))
SAVE_DIR.mkdir(parents=True, exist_ok=True)
BEST_PATH = SAVE_DIR / 'yolov3_best.pt'

print(f"🚀 YOLOv3 Training Started!")
print(f"📁 Model will be saved to: {BEST_PATH}")
print(f"🔧 Training Config: {EPOCHS} epochs, batch_size={DET['batch_size']}, img_size={IMG_SIZE}")
print(f"💾 Device: {device}")

best_map = -1.0

for epoch in range(1, EPOCHS + 1):
    print(f"\n📚 Starting Epoch {epoch:2d}/{EPOCHS}")

    # === 학습 ===
    model.train()
    running_loss = 0.0

    for batch_idx, (imgs, targets, names) in enumerate(train_loader):
        # GPU 이동
        imgs = imgs.to(device, non_blocking=True)
        t = build_batch_targets(targets, imgs.size(0))

        # Forward & Backward
        opt.zero_grad(set_to_none=True)

        with autocast(enabled=bool(DET.get('amp', True))):
            outputs = model(imgs)
            loss, parts = criterion(outputs, t)

        scaler.scale(loss).backward()
        scaler.step(opt)
        scaler.update()
        running_loss += loss.item()

        # 진행 상황 (25% 간격)
        if (batch_idx + 1) % (len(train_loader) // 4) == 0:
            progress = ((batch_idx + 1) / len(train_loader)) * 100
            lr_current = opt.param_groups[0]['lr']
            print(f"   📈 Progress: {progress:5.1f}% | Loss: {loss.item():.4f} | LR: {lr_current:.2e}")

    scheduler.step()
    avg_loss = running_loss / max(1, len(train_loader))
    print(f"✅ Epoch {epoch:2d}/{EPOCHS} completed - Average Loss: {avg_loss:.4f}")

    # === 빠른 Validation ===
    print("🔍 Running validation...")

    try:
        model.eval()
        val_loss = 0.0
        val_batches = 0

        with torch.no_grad():
            for val_imgs, val_targets, val_names in val_loader:
                if val_batches >= 5:  # 처음 5배치만
                    break

                val_imgs = val_imgs.to(device, non_blocking=True)
                val_targets = val_targets.to(device, non_blocking=True)

                val_outputs = model(val_imgs)
                loss, _ = criterion(val_outputs, val_targets)

                val_loss += loss.item()
                val_batches += 1

        avg_val_loss = val_loss / max(1, val_batches)
        estimated_ap50 = max(0.0, 1.0 - avg_val_loss)  # 손실 기반 성능 추정

        print(f"📊 Validation Loss: {avg_val_loss:.4f} | Est. AP@0.5: {estimated_ap50:.4f}")

        # 최고 성능 저장
        if estimated_ap50 > best_map:
            best_map = estimated_ap50
            torch.save({
                'model': model.state_dict(),
                'epoch': epoch,
                'map50': float(estimated_ap50),
                'avg_val_loss': avg_val_loss,
                'img_size': IMG_SIZE
            }, BEST_PATH)
            print(f"🎉 NEW BEST! Est. AP@0.5: {estimated_ap50:.4f} - Saved!")
        else:
            print(f"📈 Current: {estimated_ap50:.4f} (Best: {best_map:.4f})")

    except Exception as e:
        print(f"❌ Validation failed: {e}")
        print("⏭️  Continuing...")

print(f"\n🎊 Training Completed!")
print(f"🏆 Best Est. AP@0.5: {best_map:.4f}")
print(f"💾 Best model saved at: {BEST_PATH}")

# GPU 메모리 정리
torch.cuda.empty_cache()

In [None]:
# ============================================
# [STEP 4] Test 샘플 추론
#  - postprocess() 후: 패딩 필터 → 크기 필터 → "군집 Top-1"만 남김
#  - 전역 Top-1(keep_topk) 완전 제거: 여러 마리 벌 유지
# ============================================
import os, random
from pathlib import Path
import numpy as np
import torch
import matplotlib.pyplot as plt

# OpenCV 대신 PIL 사용 (기본 설치됨)
try:
    import cv2
    USE_CV2 = True
except ImportError:
    from PIL import Image
    USE_CV2 = False
    print("[INFO] OpenCV not found, using PIL instead")

# ---- 하이퍼 ----
device   = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_SIZE = 416
CONF_THR = 0.97
IOU_THR  = 0.2   # NMS/매칭 IoU
MAX_DETS = 100                                     # 이미지당 최대 유지 수
PAD_FILTER   = True
MIN_WH_PX    = 12
MAX_WH_FRAC  = 0.90
CLUSTER_IOU  = 0.2                                # 군집 묶음 임계 (0.3~0.5에서 조절)
MAX_CLUSTERS   = 10          # 상한 (예: 화면에 너무 많으면 12개로 제한)
# TARGET_CLUSTERS= 5        # 또는 8 처럼 원하는 목표 개수 지정

# ---- 경로 ----
TEST_IMG_DIR = Path(CFG.get('detection', {}).get('test_images', '/content/bee_project/images/test'))
if not TEST_IMG_DIR.exists() or len(list(TEST_IMG_DIR.glob('*.*'))) == 0:
    print(f'[WARN] {TEST_IMG_DIR} not found or empty. Use VAL images instead.')
    TEST_IMG_DIR = Path('/content/bee_project/images/test')
OUT_DIR = Path(CFG.get('paths', {}).get('viz', '/content/bee_project/outputs/viz')) / 'test_pred_multi'
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---- 모델 ----
model = YOLOv3(num_classes=1, img_size=IMG_SIZE).to(device)
BEST_PATH = Path(CFG.get('paths', {}).get('weights', '/content/bee_project/outputs/weights')) / 'yolov3_best.pt'
if BEST_PATH.exists():
    ckpt = torch.load(BEST_PATH, map_location=device)
    model.load_state_dict(ckpt.get('model', ckpt), strict=False)
    print(f'[INFO] loaded weights: {BEST_PATH}')
else:
    print(f'[WARN] best weights not found at {BEST_PATH}. Using current model params.')
model.eval()

# ---- 전처리 함수 (Albumentations 대체) ----
def resize_with_padding(image, target_size, fill_value=114):
    """이미지를 비율 유지하며 리사이즈하고 패딩 추가"""
    if USE_CV2:
        h, w = image.shape[:2]
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 리사이즈
        resized = cv2.resize(image, (new_w, new_h))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        bottom = pad_h - top
        left = pad_w // 2
        right = pad_w - left

        # 패딩 적용
        padded = cv2.copyMakeBorder(resized, top, bottom, left, right,
                                   cv2.BORDER_CONSTANT, value=(fill_value, fill_value, fill_value))
        return padded, scale, (left, top)
    else:
        # PIL 버전
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        w, h = image.size
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 리사이즈
        resized = image.resize((new_w, new_h), Image.Resampling.LANCZOS)

        # 새 이미지 생성 (패딩)
        new_image = Image.new('RGB', (target_size, target_size), (fill_value, fill_value, fill_value))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        left = pad_w // 2

        # 붙여넣기
        new_image.paste(resized, (left, top))

        return np.array(new_image), scale, (left, top)

def preprocess_image(image_rgb):
    """이미지 전처리 (Albumentations 대체)"""
    processed_img, scale, pad_offset = resize_with_padding(image_rgb, IMG_SIZE)

    # 텐서 변환
    tensor_img = torch.from_numpy(processed_img).permute(2, 0, 1).float() / 255.0

    return tensor_img

# ---- 유틸 ----
def draw_dets(canvas_bgr, boxes_xyxy, scores):
    h, w = canvas_bgr.shape[:2]
    for (x1, y1, x2, y2), sc in zip(boxes_xyxy, scores):
        x1 = int(max(0, min(w-1, x1))); y1 = int(max(0, min(h-1, y1)))
        x2 = int(max(0, min(w-1, x2))); y2 = int(max(0, min(h-1, y2)))
        if x2 <= x1 or y2 <= y1:
            continue

        if USE_CV2:
            cv2.rectangle(canvas_bgr, (x1, y1), (x2, y2), (0,255,0), 2)
            label = f'bee {sc:.2f}'
            (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            y_text = max(0, y1-4)
            cv2.rectangle(canvas_bgr, (x1, y_text-th-4), (x1+tw+4, y_text+2), (0,255,0), -1)
            cv2.putText(canvas_bgr, label, (x1+2, y_text-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
        else:
            # PIL/numpy로 간단한 박스 그리기
            canvas_bgr[y1:y1+2, x1:x2] = [0, 255, 0]  # 상단
            canvas_bgr[y2-2:y2, x1:x2] = [0, 255, 0]  # 하단
            canvas_bgr[y1:y2, x1:x1+2] = [0, 255, 0]  # 좌측
            canvas_bgr[y1:y2, x2-2:x2] = [0, 255, 0]  # 우측

    return canvas_bgr

def calc_resize_pad_params(W0, H0, img_size=IMG_SIZE):
    r = min(img_size / W0, img_size / H0)
    W1, H1 = int(round(W0*r)), int(round(H0*r))
    xpad = (img_size - W1) / 2.0
    ypad = (img_size - H1) / 2.0
    return r, xpad, ypad, W1, H1

def filter_by_valid_area(boxes416, W0, H0):
    if not PAD_FILTER or len(boxes416)==0:
        return boxes416, np.ones(len(boxes416), bool)
    _, xpad, ypad, W1, H1 = calc_resize_pad_params(W0, H0, IMG_SIZE)
    x1v, y1v, x2v, y2v = xpad, ypad, xpad+W1, ypad+H1
    cx = (boxes416[:,0]+boxes416[:,2]) * 0.5
    cy = (boxes416[:,1]+boxes416[:,3]) * 0.5
    keep = (cx>=x1v) & (cx<=x2v) & (cy>=y1v) & (cy<=y2v)
    return boxes416[keep], keep

def filter_by_size(boxes416, scores, min_wh=MIN_WH_PX, max_frac=MAX_WH_FRAC):
    if len(boxes416)==0: return boxes416, scores
    w = np.clip(boxes416[:,2]-boxes416[:,0], 0, None)
    h = np.clip(boxes416[:,3]-boxes416[:,1], 0, None)
    keep = (w>=min_wh) & (h>=min_wh) & (w<=IMG_SIZE*max_frac) & (h<=IMG_SIZE*max_frac)
    return boxes416[keep], scores[keep]

def keep_top1_per_cluster(boxes, scores, cluster_iou=CLUSTER_IOU):
    """전역 Top-1이 아니라, IoU>th인 것들을 군집으로 묶고 군집마다 최고 1개만 남김."""
    if len(boxes)==0: return boxes, scores
    b, s = boxes, scores
    x1,y1,x2,y2 = b[:,0], b[:,1], b[:,2], b[:,3]
    area = np.maximum(0, x2-x1) * np.maximum(0, y2-y1)
    idxs = list(range(len(b)))
    idxs.sort(key=lambda i: s[i], reverse=True)  # 높은 점수부터 시드 선택
    kept = []
    while idxs:
        i = idxs.pop(0)
        kept.append(i)  # 시드 채택
        rest = []
        for j in idxs:
            xx1 = max(x1[i], x1[j]); yy1 = max(y1[i], y1[j])
            xx2 = min(x2[i], x2[j]); yy2 = min(y2[i], y2[j])
            inter = max(0, xx2-xx1) * max(0, yy2-yy1)
            iou = inter / (area[i] + area[j] - inter + 1e-9)
            if iou <= cluster_iou:
                rest.append(j)  # 다른 군집이면 유지
        idxs = rest
    kept = np.array(kept, dtype=int)[:MAX_DETS]
    return b[kept], s[kept]

# ---------------- 군집 제한/자동조절 버전 ----------------
def cluster_and_prune(boxes, scores,
                      cluster_iou=0.35,
                      max_clusters=None,         # 최종 군집 상한(하드캡)
                      target_clusters=None,      # 목표 군집 수(자동 IoU 조절)
                      iou_min=0.20, iou_max=0.75, iou_step=0.05):
    """
    boxes:(N,4, xyxy in 416) / scores:(N,)
    1) IoU>cluster_iou 로 군집화하고, 군집마다 최고 점수 1개만 남김(대표).
    2) target_clusters가 주어지면 cluster_iou를 조절해 군집 수를 목표에 가깝게 맞춤.
    3) max_clusters가 주어지면 대표들을 점수순 상위 K개만 유지.

    반환: (boxes_rep, scores_rep, used_iou)
    """
    import numpy as np

    def cluster_once(b, s, th):
        if len(b) == 0: return np.array([], int), th
        x1,y1,x2,y2 = b[:,0], b[:,1], b[:,2], b[:,3]
        area = np.maximum(0, x2-x1) * np.maximum(0, y2-y1)
        order = list(range(len(b)))
        order.sort(key=lambda i: s[i], reverse=True)  # 높은 점수부터 시드
        kept = []
        while order:
            i = order.pop(0)
            kept.append(i)  # 시드 채택 → 같은 군집의 나머지는 제거
            next_order = []
            for j in order:
                xx1 = max(x1[i], x1[j]); yy1 = max(y1[i], y1[j])
                xx2 = min(x2[i], x2[j]); yy2 = min(y2[i], y2[j])
                inter = max(0, xx2-xx1) * max(0, yy2-yy1)
                iou = inter / (area[i] + area[j] - inter + 1e-9)
                if iou <= th:  # 다른 군집이면 유지
                    next_order.append(j)
            order = next_order
        return np.asarray(kept, int), th

    b = boxes; s = scores
    used_iou = cluster_iou

    # (옵션) 목표 군집 수에 맞게 IoU 자동 튜닝
    if target_clusters is not None and len(b):
        # 너무 많으면 th ↑ (더 세게 묶음), 너무 적으면 th ↓ (덜 묶음)
        # 간단한 라인 서치
        th = cluster_iou
        best = None
        for _ in range(20):  # 최대 20스텝 탐색
            kept, _ = cluster_once(b, s, th)
            cnum = len(kept)
            best = (kept, th) if best is None or abs(cnum-target_clusters) < abs(len(best[0])-target_clusters) else best
            if cnum == target_clusters: break
            if cnum > target_clusters and th < iou_max:   th = min(iou_max, th + iou_step)
            elif cnum < target_clusters and th > iou_min: th = max(iou_min, th - iou_step)
            else: break
        kept, used_iou = best
    else:
        kept, used_iou = cluster_once(b, s, cluster_iou)

    # (옵션) 상위 K개 하드캡
    if max_clusters is not None and len(kept):
        kept = kept[np.argsort(s[kept])[::-1][:max_clusters]]

    return b[kept], s[kept], used_iou

# ---- 이미지 로드 ----
img_paths = sorted([p for ext in ('*.jpg','*.jpeg','*.png','*.bmp') for p in TEST_IMG_DIR.glob(ext)])
assert len(img_paths) > 0, f'No images under {TEST_IMG_DIR}'
sample_paths = random.sample(img_paths, min(12, len(img_paths)))

# ---- 추론 ----
to_show = []
print(f"🔍 Processing {len(sample_paths)} test images...")

with torch.no_grad():
    for i, p in enumerate(sample_paths):
        print(f"Processing {i+1}/{len(sample_paths)}: {p.name}")

        # 이미지 로드
        if USE_CV2:
            img0 = cv2.imread(str(p))
            if img0 is None:
                print(f'[WARN] load fail: {p}'); continue
            img_rgb = cv2.cvtColor(img0, cv2.COLOR_BGR2RGB)
        else:
            img_pil = Image.open(str(p)).convert('RGB')
            img_rgb = np.array(img_pil)

        H0, W0 = img_rgb.shape[:2]

        # 전처리
        timg = preprocess_image(img_rgb).to(device).unsqueeze(0)   # [1,3,416,416]

        # 추론
        outs = model(timg)

        # 후처리 (obj×cls + NMS)  -> 반드시 프로젝트의 postprocess 사용
        try:
            res = postprocess(outs, CONF_THR, IOU_THR, IMG_SIZE, 1, model.anchors)
            boxes, scores, cls_ids = res[0]  # xyxy(416), score
            if isinstance(boxes, torch.Tensor):  boxes = boxes.detach().cpu().numpy()
            if isinstance(scores, torch.Tensor): scores = scores.detach().cpu().numpy()
        except Exception as e:
            print(f"[WARN] postprocess failed for {p.name}: {e}")
            boxes, scores = np.array([]).reshape(0,4), np.array([])

        # ---- FP 억제: 패딩/크기/군집 ----
        if len(boxes) > 0:
            boxes, keep_mask = filter_by_valid_area(boxes, W0, H0)   # 패딩에 걸친 상자 제거
            scores = scores[keep_mask]
            boxes, scores = filter_by_size(boxes, scores)            # 너무 작거나 큰 상자 제거
            boxes, scores, used_iou = cluster_and_prune(
                boxes, scores,
                cluster_iou=CLUSTER_IOU,          # 초기 IoU
                max_clusters=MAX_CLUSTERS,        # 상한
                target_clusters=TARGET_CLUSTERS,  # 필요 시 사용
            )

        print(f"   Detected {len(boxes)} objects")

        # 시각화
        canvas = (timg[0].permute(1,2,0).cpu().numpy()*255.0).round().clip(0,255).astype(np.uint8)  # RGB

        if USE_CV2:
            canvas_bgr = cv2.cvtColor(canvas, cv2.COLOR_RGB2BGR)
        else:
            canvas_bgr = canvas.copy()  # RGB 그대로 사용

        canvas_bgr = draw_dets(canvas_bgr, boxes, scores)

        # 저장
        save_path = OUT_DIR / f'{p.stem}_pred_multi.jpg'

        if USE_CV2:
            cv2.imwrite(str(save_path), canvas_bgr)
            display_img = cv2.cvtColor(canvas_bgr, cv2.COLOR_BGR2RGB)
        else:
            Image.fromarray(canvas_bgr).save(str(save_path))
            display_img = canvas_bgr

        if len(to_show) < 8:
            to_show.append(display_img)

print(f'[INFO] saved predictions to: {OUT_DIR}')

# 시각화
if len(to_show) > 0:
    cols = 4
    rows = int(np.ceil(len(to_show)/cols))
    plt.figure(figsize=(4*cols, 4*rows))
    for i, im in enumerate(to_show, 1):
        plt.subplot(rows, cols, i)
        plt.imshow(im)
        plt.axis('off')
        plt.title(f'Test {i}')
    plt.tight_layout()
    plt.show()
else:
    print("No images to display")

print("✅ Test inference completed!")

In [None]:
# ============================================
# [VAL 시각화 - 군집 개수 제한 + FP 억제] (설치 없이 동작)
#  - GT/Pred 모두 416 좌표계에서 일관 매칭
#  - NMS 이후: 패딩 필터 → 크기 필터 → 군집화(Top-1) + 군집 개수 제한
#  - 주의: 이 제한은 "시각화/운영용" 옵션. mAP 평가에는 적용하지 마세요.
# ============================================
import os, random
from pathlib import Path
import numpy as np
import torch
import matplotlib.pyplot as plt

# OpenCV 대신 PIL 사용 (기본 설치됨)
try:
    import cv2
    USE_CV2 = True
except ImportError:
    from PIL import Image, ImageDraw, ImageFont
    USE_CV2 = False
    print("[INFO] OpenCV not found, using PIL instead")

# -------- 하이퍼/시각화 파라미터 --------
IMG_SIZE        = 416
VIS_CONF        = max(float(DET.get('conf_threshold', 0.25)), 0.50)  # 시각화용 conf
NMS_IOU         = 0.2                                              # NMS/매칭 IoU
CLASS_AGNOSTIC  = True

# FP 억제 파라미터
PAD_FILTER      = True           # 레터박스 패딩 영역 제거
MIN_WH_PX       = 12             # 너무 작은 상자 제거
MAX_WH_FRAC     = 0.90           # 이미지 대비 너무 큰 상자 제거

# ---- 군집 제어 ----
CLUSTER_IOU       = 0.9         # 군집 묶음 기준 IoU
MAX_CLUSTERS      = 10           # 최종 표시할 군집 상한(점수 상위 K개)
TARGET_CLUSTERS   = None         # 군집 수를 이 값에 맞추도록 IoU 자동 조절(원하면 정수로)

VIS_MAX_DETS    = 200            # NMS 이후 군집화에 투입할 최대 박스 수(보수적 상한)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# -------- 경로 --------
VAL_IMG_DIR = Path(CFG['detection']['val_images'])
VAL_LBL_DIR = Path(CFG['detection']['val_labels'])
OUT_DIR = Path(CFG.get('paths', {}).get('viz', '/content/bee_project/outputs/viz')) / 'val_match_416_clustered'
OUT_DIR.mkdir(parents=True, exist_ok=True)

# -------- 모델 --------
BEST_PATH = Path(CFG.get('paths', {}).get('weights', '/content/bee_project/outputs/weights')) / 'yolov3_best.pt'
if 'model' not in globals():
    model = YOLOv3(num_classes=1, img_size=IMG_SIZE).to(device)
if BEST_PATH.exists():
    ckpt = torch.load(BEST_PATH, map_location=device)
    model.load_state_dict(ckpt.get('model', ckpt), strict=False)
    print(f"[INFO] loaded weights: {BEST_PATH}  (epoch={ckpt.get('epoch','?')}, mAP@0.5={ckpt.get('map50','?')})")
else:
    print(f"[WARN] best weights not found at {BEST_PATH}. Using current model params.")
model.eval()

# -------- 전처리 함수 (Albumentations 대체) --------
def resize_with_padding_and_boxes(image, boxes, target_size, fill_value=114):
    """이미지와 바운딩 박스를 함께 변환"""
    if USE_CV2:
        h, w = image.shape[:2]
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 이미지 리사이즈
        resized = cv2.resize(image, (new_w, new_h))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        bottom = pad_h - top
        left = pad_w // 2
        right = pad_w - left

        # 패딩 적용
        padded = cv2.copyMakeBorder(resized, top, bottom, left, right,
                                   cv2.BORDER_CONSTANT, value=(fill_value, fill_value, fill_value))
    else:
        # PIL 버전
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)

        w, h = image.size
        scale = min(target_size / h, target_size / w)
        new_h, new_w = int(h * scale), int(w * scale)

        # 리사이즈
        resized = image.resize((new_w, new_h), Image.Resampling.LANCZOS)

        # 새 이미지 생성 (패딩)
        new_image = Image.new('RGB', (target_size, target_size), (fill_value, fill_value, fill_value))

        # 패딩 계산
        pad_h = target_size - new_h
        pad_w = target_size - new_w
        top = pad_h // 2
        left = pad_w // 2

        # 붙여넣기
        new_image.paste(resized, (left, top))
        padded = np.array(new_image)

    # 바운딩 박스 변환
    transformed_boxes = []
    if len(boxes) > 0:
        pad_x = (target_size - new_w) // 2
        pad_y = (target_size - new_h) // 2

        for box in boxes:
            x1, y1, x2, y2 = box
            # 스케일 적용
            x1 *= scale
            y1 *= scale
            x2 *= scale
            y2 *= scale
            # 패딩 오프셋 적용
            x1 += pad_x
            y1 += pad_y
            x2 += pad_x
            y2 += pad_y
            transformed_boxes.append([x1, y1, x2, y2])

    return padded, transformed_boxes

# -------- 유틸 --------
def yolo_norm_to_voc_abs(boxes_yolo, W, H):
    out = []
    for cx, cy, w, h in boxes_yolo:
        bw, bh = w*W, h*H
        x1 = cx*W - bw/2.0; y1 = cy*H - bh/2.0
        out.append([x1, y1, x1+bw, y1+bh])
    return out

def ensure_xyxy_in_416(boxes, img_size=IMG_SIZE):
    b = np.asarray(boxes, dtype=np.float32).copy()
    if b.size == 0: return b
    if np.nanmax(b) <= 1.5:  # 정규화로 보임
        b *= float(img_size)
    good = np.mean((b[:,2] > b[:,0]) & (b[:,3] > b[:,1]))
    if good < 0.5:  # xywh(center) → xyxy
        cx, cy, w, h = b[:,0], b[:,1], b[:,2], b[:,3]
        b = np.stack([cx - w/2, cy - h/2, cx + w/2, cy + h/2], axis=1)
    b[:,[0,2]] = np.clip(b[:,[0,2]], 0, img_size-1)
    b[:,[1,3]] = np.clip(b[:,[1,3]], 0, img_size-1)
    return b

def iou_matrix(A, B, eps=1e-9):
    if len(A)==0 or len(B)==0:
        return np.zeros((len(A), len(B)), dtype=np.float32)
    A = A.astype(np.float32); B = B.astype(np.float32)
    M, N = len(A), len(B)
    out = np.zeros((M, N), np.float32)
    for i in range(M):
        ax1, ay1, ax2, ay2 = A[i]
        aw, ah = max(0, ax2-ax1), max(0, ay2-ay1)
        aarea = aw*ah
        for j in range(N):
            bx1, by1, bx2, by2 = B[j]
            iw = max(0, min(ax2,bx2) - max(ax1,bx1))
            ih = max(0, min(ay2,by2) - max(ay1,by1))
            inter = iw*ih
            barea = max(0, bx2-bx1)*max(0, by2-by1)
            out[i,j] = inter / (aarea + barea - inter + eps)
    return out

def greedy_match(pred_boxes, pred_scores, gt_boxes, iou_thr=NMS_IOU):
    order = np.argsort(-pred_scores)
    taken = set(); matches = []
    ious = iou_matrix(pred_boxes, gt_boxes) if len(pred_boxes)*len(gt_boxes)>0 else None
    for pi in order:
        if ious is None or len(gt_boxes)==0: break
        gi = int(np.argmax(ious[pi]))
        if ious[pi, gi] >= iou_thr and gi not in taken:
            matches.append((gi, pi, float(ious[pi, gi]))); taken.add(gi)
    fn_gts = [gi for gi in range(len(gt_boxes)) if gi not in taken]
    fp_preds = [pi for pi in range(len(pred_boxes)) if all(pi != m[1] for m in matches)]
    return matches, fn_gts, fp_preds

def draw_box(image, box, color, thickness=2, label=None):
    """이미지에 박스 그리기 (CV2/PIL 호환)"""
    if USE_CV2:
        h,w = image.shape[:2]
        x1,y1,x2,y2 = [int(round(v)) for v in box]
        x1 = max(0, min(w-1, x1)); y1 = max(0, min(h-1, y1))
        x2 = max(0, min(w-1, x2)); y2 = max(0, min(h-1, y2))
        if x2<=x1 or y2<=y1: return image
        cv2.rectangle(image, (x1,y1), (x2,y2), color, thickness)
        if label:
            (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            y_text = max(0, y1-4)
            cv2.rectangle(image, (x1, y_text-th-4), (x1+tw+4, y_text+2), color, -1)
            cv2.putText(image, label, (x1+2, y_text-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
    else:
        # PIL/numpy 버전 (간단한 박스만)
        h,w = image.shape[:2]
        x1,y1,x2,y2 = [int(round(v)) for v in box]
        x1 = max(0, min(w-1, x1)); y1 = max(0, min(h-1, y1))
        x2 = max(0, min(w-1, x2)); y2 = max(0, min(h-1, y2))
        if x2<=x1 or y2<=y1: return image

        # 박스 그리기 (두께 조절)
        for t in range(thickness):
            if y1+t < h and x1 < w: image[y1+t:y1+t+1, x1:x2] = color  # 상단
            if y2-t-1 >= 0 and x1 < w: image[y2-t-1:y2-t, x1:x2] = color  # 하단
            if x1+t < w and y1 < h: image[y1:y2, x1+t:x1+t+1] = color  # 좌측
            if x2-t-1 >= 0 and y1 < h: image[y1:y2, x2-t-1:x2-t] = color  # 우측

    return image

def add_text_to_image(image, text, position=(6, 16), color=(0, 255, 255)):
    """이미지에 텍스트 추가"""
    if USE_CV2:
        cv2.putText(image, text, position, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
    # PIL 버전은 복잡하므로 생략
    return image

def second_stage_nms_xyxy(boxes, scores, iou_thr=NMS_IOU, topk=VIS_MAX_DETS, class_ids=None):
    if len(boxes)==0:
        return np.empty((0,4), np.float32), np.empty((0,), np.float32), np.empty((0,), np.int32)
    b = torch.tensor(boxes, dtype=torch.float32)
    s = torch.tensor(scores, dtype=torch.float32)
    if CLASS_AGNOSTIC or class_ids is None:
        try:
            from torchvision.ops import nms
            keep = nms(b, s, iou_thr)
        except ImportError:
            # torchvision.ops가 없으면 간단한 NMS 구현
            keep = simple_nms(boxes, scores, iou_thr)
            keep = torch.tensor(keep)
    else:
        keep_list = []
        cls = torch.tensor(class_ids, dtype=torch.int64)
        try:
            from torchvision.ops import nms
            for c in cls.unique().tolist():
                m = (cls==c).nonzero(as_tuple=True)[0]
                if len(m): keep_list.append(m[nms(b[m], s[m], iou_thr)])
        except ImportError:
            # Fallback to simple implementation
            keep_list = [torch.tensor(simple_nms(boxes, scores, iou_thr))]
        keep = torch.cat(keep_list) if keep_list else torch.empty(0, dtype=torch.long)
    if len(keep)>0:
        keep = keep[s[keep].argsort(descending=True)][:topk]
    return b[keep].cpu().numpy(), s[keep].cpu().numpy(), (
        np.zeros(len(keep), np.int32) if class_ids is None else np.asarray(class_ids)[keep.cpu().numpy()].astype(np.int32))

def simple_nms(boxes, scores, iou_thr):
    """간단한 NMS 구현 (torchvision 없을 때)"""
    if len(boxes) == 0:
        return []

    boxes = np.array(boxes)
    scores = np.array(scores)

    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    areas = (x2 - x1) * (y2 - y1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)

        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1)
        h = np.maximum(0.0, yy2 - yy1)
        inter = w * h

        iou = inter / (areas[i] + areas[order[1:]] - inter)

        inds = np.where(iou <= iou_thr)[0]
        order = order[inds + 1]

    return keep

# --- 패딩/크기 필터 ---
def calc_resize_pad_params(W0, H0, img_size=IMG_SIZE):
    r = min(img_size / W0, img_size / H0)
    W1, H1 = int(round(W0*r)), int(round(H0*r))
    xpad = (img_size - W1) / 2.0
    ypad = (img_size - H1) / 2.0
    return r, xpad, ypad, W1, H1

def filter_by_valid_area(boxes416, W0, H0):
    if not PAD_FILTER or len(boxes416)==0:
        return boxes416, np.ones(len(boxes416), bool)
    _, xpad, ypad, W1, H1 = calc_resize_pad_params(W0, H0, IMG_SIZE)
    x1v, y1v, x2v, y2v = xpad, ypad, xpad+W1, ypad+H1
    cx = (boxes416[:,0]+boxes416[:,2]) * 0.5
    cy = (boxes416[:,1]+boxes416[:,3]) * 0.5
    keep = (cx>=x1v) & (cx<=x2v) & (cy>=y1v) & (cy<=y2v)
    return boxes416[keep], keep

def filter_by_size(boxes416, scores, min_wh=MIN_WH_PX, max_frac=MAX_WH_FRAC):
    if len(boxes416)==0: return boxes416, scores
    w = np.clip(boxes416[:,2]-boxes416[:,0], 0, None)
    h = np.clip(boxes416[:,3]-boxes416[:,1], 0, None)
    keep = (w>=min_wh) & (h>=min_wh) & (w<=IMG_SIZE*max_frac) & (h<=IMG_SIZE*max_frac)
    return boxes416[keep], scores[keep]

# --- 군집 제한/자동조절 ---
def cluster_and_prune(boxes, scores,
                      cluster_iou=0.35,
                      max_clusters=None,
                      target_clusters=None,
                      iou_min=0.20, iou_max=0.75, iou_step=0.05):
    """군집(Top-1 대표) 추출 후 개수 제한/자동조절."""
    import numpy as np
    def cluster_once(b, s, th):
        if len(b) == 0: return np.array([], int), th
        x1,y1,x2,y2 = b[:,0], b[:,1], b[:,2], b[:,3]
        area = np.maximum(0, x2-x1) * np.maximum(0, y2-y1)
        order = list(range(len(b))); order.sort(key=lambda i: s[i], reverse=True)
        kept = []
        while order:
            i = order.pop(0)
            kept.append(i)
            rest = []
            for j in order:
                xx1 = max(x1[i], x1[j]); yy1 = max(y1[i], y1[j])
                xx2 = min(x2[i], x2[j]); yy2 = min(y2[i], y2[j])
                inter = max(0, xx2-xx1) * max(0, yy2-yy1)
                iou = inter / (area[i] + area[j] - inter + 1e-9)
                if iou <= th: rest.append(j)
            order = rest
        return np.asarray(kept, int), th

    b, s = boxes, scores
    used_iou = cluster_iou
    if target_clusters is not None and len(b):
        th = cluster_iou; best = None
        for _ in range(20):
            kept, _ = cluster_once(b, s, th)
            cnum = len(kept)
            if best is None or abs(cnum-target_clusters) < abs(len(best[0])-target_clusters):
                best = (kept, th)
            if cnum == target_clusters: break
            if cnum > target_clusters and th < iou_max:   th = min(iou_max, th + iou_step)
            elif cnum < target_clusters and th > iou_min: th = max(iou_min, th - iou_step)
            else: break
        kept, used_iou = best
    else:
        kept, used_iou = cluster_once(b, s, cluster_iou)

    if max_clusters is not None and len(kept):
        kept = kept[np.argsort(s[kept])[::-1][:max_clusters]]
    return b[kept], s[kept], used_iou

# -------- 샘플 선택 --------
img_paths = sorted([p for ext in ('*.jpg','*.jpeg','*.png','*.bmp') for p in VAL_IMG_DIR.glob(ext)])
assert len(img_paths)>0, f'No images under {VAL_IMG_DIR}'
sample_paths = random.sample(img_paths, min(12, len(img_paths)))

print(f"🔍 Processing {len(sample_paths)} validation images...")

to_show = []
with torch.no_grad():
    for i, p in enumerate(sample_paths):
        print(f"Processing {i+1}/{len(sample_paths)}: {p.name}")

        # 이미지 로드
        if USE_CV2:
            img0 = cv2.imread(str(p))
            if img0 is None:
                print(f'[WARN] load fail: {p}'); continue
            img_rgb = cv2.cvtColor(img0, cv2.COLOR_BGR2RGB)
        else:
            img_pil = Image.open(str(p)).convert('RGB')
            img_rgb = np.array(img_pil)

        H0, W0 = img_rgb.shape[:2]

        # --- GT 읽기 ---
        gt_yolo = []
        txt = VAL_LBL_DIR / f'{p.stem}.txt'
        if txt.exists():
            with open(txt, 'r', encoding='utf-8') as f:
                for ln in f.read().strip().splitlines():
                    if not ln.strip(): continue
                    _, cx, cy, w, h = ln.split()[:5]
                    gt_yolo.append([float(cx), float(cy), float(w), float(h)])
        gt_voc_abs = yolo_norm_to_voc_abs(gt_yolo, W0, H0) if gt_yolo else []

        # --- 이미지와 GT 박스 변환 ---
        padded_img, gt_416 = resize_with_padding_and_boxes(img_rgb, gt_voc_abs, IMG_SIZE)
        gt_416 = np.array(gt_416, dtype=np.float32) if len(gt_416)>0 else np.zeros((0,4), np.float32)

        # 텐서 변환
        timg = torch.from_numpy(padded_img).permute(2, 0, 1).float() / 255.0
        timg = timg.to(device).unsqueeze(0)  # [1,3,416,416]

        # --- 추론 + postprocess (obj×cls + 1차 NMS) ---
        outs = model(timg)
        try:
            boxes, scores, cls_ids = postprocess(outs, VIS_CONF, NMS_IOU, IMG_SIZE, 1, model.anchors)[0]
            if isinstance(boxes, torch.Tensor):  boxes = boxes.detach().cpu().numpy()
            if isinstance(scores, torch.Tensor): scores = scores.detach().cpu().numpy()
            if isinstance(cls_ids, torch.Tensor):cls_ids = cls_ids.detach().cpu().numpy()
        except Exception as e:
            print(f"[WARN] postprocess failed for {p.name}: {e}")
            boxes, scores, cls_ids = np.array([]).reshape(0,4), np.array([]), np.array([])

        # --- 좌표 보정 + 2차 NMS(안전망) ---
        boxes = ensure_xyxy_in_416(boxes, IMG_SIZE)
        boxes, scores, cls_ids = second_stage_nms_xyxy(boxes, scores, iou_thr=NMS_IOU,
                                                       topk=VIS_MAX_DETS,
                                                       class_ids=(None if CLASS_AGNOSTIC else cls_ids))

        # --- FP 억제 + 군집 개수 제어 ---
        if len(boxes) > 0:
            # 1) 패딩 영역 제거
            boxes, keep_mask = filter_by_valid_area(boxes, W0, H0)
            scores = scores[keep_mask]
            # 2) 크기 필터
            boxes, scores = filter_by_size(boxes, scores, min_wh=MIN_WH_PX, max_frac=MAX_WH_FRAC)
            # 3) 군집화 + 개수 제한 / 자동조절
            boxes, scores, used_iou = cluster_and_prune(
                boxes, scores,
                cluster_iou=CLUSTER_IOU,
                max_clusters=MAX_CLUSTERS,
                target_clusters=TARGET_CLUSTERS
            )
        else:
            used_iou = CLUSTER_IOU

        print(f"   GT: {len(gt_416)}, Pred: {len(boxes)}")

        # --- 매칭(416 좌표계) ---
        matches, fn_gts, fp_preds = greedy_match(boxes, scores, gt_416, iou_thr=NMS_IOU)

        # --- 시각화(416 캔버스) ---
        canvas = (timg[0].permute(1,2,0).cpu().numpy()*255.0).round().clip(0,255).astype(np.uint8)

        if USE_CV2:
            vis = cv2.cvtColor(canvas, cv2.COLOR_RGB2BGR)
        else:
            vis = canvas.copy()

        matched_pred = {pi for _,pi,_ in matches}
        iou_map = {gi:iou for gi,_,iou in matches}

        # 예측 박스 그리기
        for idx, (b, sc) in enumerate(zip(boxes, scores)):
            color = (0,255,0) if idx in matched_pred else (0,165,255)  # TP=초록, FP=주황
            tag   = "pred TP" if idx in matched_pred else "pred FP"
            vis = draw_box(vis, b, color, 2, f'{tag} {sc:.2f}')

        # GT 박스 그리기
        for gi in range(len(gt_416)):
            b = gt_416[gi]
            if gi in iou_map:
                vis = draw_box(vis, b, (255,0,0), 2, f'GT TP IoU={iou_map[gi]:.2f}')
            else:
                vis = draw_box(vis, b, (0,0,255), 2, 'GT FN IoU=0.00')

        # 사용된 군집 IoU 표시
        vis = add_text_to_image(vis, f'cluster_iou_used={used_iou:.2f}', (6, 16), (0,255,255))

        # 저장
        save_path = OUT_DIR / f'{p.stem}_416_clustered.jpg'

        if USE_CV2:
            cv2.imwrite(str(save_path), vis)
            display_img = cv2.cvtColor(vis, cv2.COLOR_BGR2RGB)
        else:
            Image.fromarray(vis).save(str(save_path))
            display_img = vis

        if len(to_show) < 8:
            to_show.append(display_img)

print(f'[INFO] saved visualizations to: {OUT_DIR}')

# ---- 미리보기(최대 8장) ----
if len(to_show) > 0:
    cols = 4
    rows = int(np.ceil(len(to_show)/cols))
    plt.figure(figsize=(4*cols, 4*rows))
    for i, im in enumerate(to_show, 1):
        plt.subplot(rows, cols, i)
        plt.imshow(im)
        plt.axis('off')
        plt.title(f'Val {i}')
    plt.tight_layout()
    plt.show()
else:
    print("No images to display")

print("✅ Validation visualization completed!")

In [None]:


'''
초록(pred TP): 예측 박스가 GT와 1:1로 매칭되어 정답(True Positive) 로 판정된 것.

주황(pred FP): 예측했지만 어떤 GT와도 매칭되지 않아 오탐(False Positive) 인 것.

파랑(GT TP): GT 박스 중에서 예측과 매칭된 정답 GT. 라벨에 표시된 값은 그 매칭의 IoU.

빨강(GT FN): 어떤 예측과도 매칭되지 않은 놓침(False Negative) 인 GT.
'''

In [None]:
"""
ResNet 벌 분류 학습 코드
원본 라벨 데이터에서 크롭을 만들고 ResNet을 학습시킵니다.
"""

import json
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
from pathlib import Path
import numpy as np
from PIL import Image

# ============================================
# 1. 기본 설정
# ============================================

# 클래스 정의 (8종류 벌)
CLASS_MAPPING = {
    "AB_LI": 0,  # 수일벌-이탈리안
    "QB_LI": 1,  # 여왕벌-이탈리안
    "AB_CA": 2,  # 수일벌-카니올란
    "QB_CA": 3,  # 여왕벌-카니올란
    "AB_BI": 4,  # 수일벌-호박벌
    "QB_BI": 5,  # 여왕벌-호박벌
    "AB_AP": 6,  # 수일벌-한봉
    "QB_AP": 7   # 여왕벌-한봉
}

CLASS_NAMES = [
    "AB_LI", "QB_LI",
    "AB_CA", "QB_CA",
    "AB_BI", "QB_BI",
    "AB_AP", "QB_AP"
]

# ============================================
# 2. 데이터 준비 (JSON → 크롭 이미지)
# ============================================

def create_crop_images():
    """JSON 라벨에서 벌 이미지를 크롭해서 저장"""

    print("=" * 60)
    print("🔪 크롭 이미지 생성 시작")
    print("=" * 60)

    # 경로 설정
    base_dir = Path('/content/bee_project')

    # 크롭 저장 폴더 생성
    for split in ['train', 'val']:
        for class_name in CLASS_NAMES:
            (base_dir / 'crops' / split / class_name).mkdir(parents=True, exist_ok=True)

    total_crops = 0

    # train과 val 각각 처리
    for split in ['train', 'val']:
        print(f"\n📁 {split} 데이터 처리 중...")

        img_dir = base_dir / 'images' / split
        json_dir = base_dir / 'labels' / split
        crop_dir = base_dir / 'crops' / split

        json_files = list(json_dir.glob('*.json'))

        for json_path in json_files:
            # JSON 파일 읽기
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)

            # 이미지 파일 찾기
            img_path = img_dir / f"{json_path.stem}.jpg"
            if not img_path.exists():
                continue

            # 이미지 열기
            image = Image.open(img_path).convert('RGB')
            img_array = np.array(image)

            # 각 벌 annotation 처리
            for ann in data.get('ANNOTATION_INFO', []):
                # 클래스 확인
                lifecycle = ann.get('LIFECYCLE', '')
                species = ann.get('SPECIES', '')
                class_key = f"{lifecycle}_{species}"

                if class_key not in CLASS_MAPPING:
                    continue

                # 바운딩 박스 좌표
                x1 = ann['XTL']
                y1 = ann['YTL']
                x2 = ann['XBR']
                y2 = ann['YBR']

                # 이미지 크롭
                crop = img_array[y1:y2, x1:x2]
                if crop.size == 0:
                    continue

                # 크롭 저장
                class_name = CLASS_NAMES[CLASS_MAPPING[class_key]]
                crop_filename = f"{json_path.stem}_{total_crops}.jpg"
                crop_path = crop_dir / class_name / crop_filename

                crop_pil = Image.fromarray(crop)
                crop_pil.save(crop_path)
                total_crops += 1

        # 각 클래스별 개수 출력
        print(f"\n📊 {split} 클래스별 분포:")
        for class_name in CLASS_NAMES:
            count = len(list((crop_dir / class_name).glob('*.jpg')))
            print(f"  {class_name}: {count}개")

    print(f"\n✅ 총 {total_crops}개 크롭 생성 완료!")
    return total_crops > 0

# ============================================
# 3. 데이터셋 클래스
# ============================================

class BeeDataset(Dataset):
    """벌 분류 데이터셋"""

    def __init__(self, root_dir, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.samples = []

        # 각 클래스 폴더에서 이미지 수집
        for class_id, class_name in enumerate(CLASS_NAMES):
            class_dir = self.root_dir / class_name
            if class_dir.exists():
                for img_path in class_dir.glob('*.jpg'):
                    self.samples.append((str(img_path), class_id))

        print(f"📁 {root_dir}: {len(self.samples)}개 이미지")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

# ============================================
# 4. ResNet 학습 함수
# ============================================

def train_resnet(
    epochs=20,
    batch_size=32,
    learning_rate=0.001,
    weight_decay=0.0001,
    use_pretrained=True,
    use_augmentation=True
):
    """
    ResNet 학습 함수

    Args:
        epochs: 학습 에폭 수
        batch_size: 배치 크기
        learning_rate: 학습률
        weight_decay: 가중치 감쇠
        use_pretrained: 사전학습 가중치 사용 여부
        use_augmentation: 데이터 증강 사용 여부

    Returns:
        best_accuracy: 최고 검증 정확도
    """

    print("=" * 60)
    print("🎓 ResNet-18 학습 시작")
    print("=" * 60)
    print(f"📝 설정:")
    print(f"  Epochs: {epochs}")
    print(f"  Batch size: {batch_size}")
    print(f"  Learning rate: {learning_rate}")
    print(f"  Weight decay: {weight_decay}")
    print(f"  Pretrained: {use_pretrained}")
    print(f"  Augmentation: {use_augmentation}")
    print("=" * 60)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"🖥️ Device: {device}")

    # ========== 데이터 전처리 ==========

    # 학습용 변환 (데이터 증강 포함)
    if use_augmentation:
        train_transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.RandomCrop(224),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(15),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    else:
        train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    # 검증용 변환 (증강 없음)
    val_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # ========== 데이터셋 생성 ==========

    train_dataset = BeeDataset('/content/bee_project/crops/train', train_transform)
    val_dataset = BeeDataset('/content/bee_project/crops/val', val_transform)

    if len(train_dataset) == 0:
        print("❌ 학습 데이터가 없습니다! create_crop_images()를 먼저 실행하세요.")
        return None

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    # ========== 모델 생성 ==========

    model = models.resnet18(pretrained=use_pretrained)
    model.fc = nn.Linear(model.fc.in_features, 8)  # 8개 클래스
    model = model.to(device)

    # ========== 손실함수, 옵티마이저 ==========

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, factor=0.5)

    # ========== 학습 시작 ==========

    best_accuracy = 0.0
    best_epoch = 0
    save_dir = Path('/content/bee_project/outputs/weights')
    save_dir.mkdir(parents=True, exist_ok=True)

    for epoch in range(epochs):
        print(f"\n📚 Epoch {epoch+1}/{epochs}")
        print("-" * 40)

        # === 학습 단계 ===
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for batch_idx, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            # 순전파
            outputs = model(images)
            loss = criterion(outputs, labels)

            # 역전파
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # 통계
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()

            # 진행 상황 출력
            if (batch_idx + 1) % 10 == 0:
                current_acc = 100. * train_correct / train_total
                print(f"  [Train] Batch {batch_idx+1}/{len(train_loader)}: "
                      f"Loss={loss.item():.4f}, Acc={current_acc:.2f}%")

        # 학습 결과
        train_acc = 100. * train_correct / train_total
        avg_train_loss = train_loss / len(train_loader)

        # === 검증 단계 ===
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for images, labels in val_loader:
                images = images.to(device)
                labels = labels.to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()

        # 검증 결과
        val_acc = 100. * val_correct / val_total
        avg_val_loss = val_loss / len(val_loader)

        # === 에폭 결과 출력 ===
        print(f"\n📊 Epoch {epoch+1} 결과:")
        print(f"  Train - Loss: {avg_train_loss:.4f}, Accuracy: {train_acc:.2f}%")
        print(f"  Val   - Loss: {avg_val_loss:.4f}, Accuracy: {val_acc:.2f}%")

        # 학습률 조정
        scheduler.step(val_acc)
        current_lr = optimizer.param_groups[0]['lr']
        print(f"  Learning Rate: {current_lr:.6f}")

        # === 최고 모델 저장 ===
        if val_acc > best_accuracy:
            best_accuracy = val_acc
            best_epoch = epoch + 1

            # 모델 저장
            save_path = save_dir / 'resnet18_best.pt'
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_accuracy': train_acc,
                'val_accuracy': val_acc,
                'train_loss': avg_train_loss,
                'val_loss': avg_val_loss,
                'class_names': CLASS_NAMES
            }, save_path)

            print(f"  🎉 새로운 최고 성능! 모델 저장: {save_path}")

    # ========== 학습 완료 ==========

    print("\n" + "=" * 60)
    print("✅ 학습 완료!")
    print(f"🏆 최고 검증 정확도: {best_accuracy:.2f}% (Epoch {best_epoch})")
    print(f"💾 모델 저장 위치: /content/bee_project/outputs/weights/resnet18_best.pt")
    print("=" * 60)

    return best_accuracy

# ============================================
# 5. 저장된 모델로 검증하기
# ============================================

def validate_best_model(data_split='val'):
    """
    저장된 최고 모델로 검증 수행

    Args:
        data_split: 'train' 또는 'val'
    """

    print("=" * 60)
    print(f"🔍 최고 모델로 {data_split} 데이터 검증")
    print("=" * 60)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # 모델 로드
    model_path = Path('/content/bee_project/outputs/weights/resnet18_best.pt')
    if not model_path.exists():
        print("❌ 저장된 모델이 없습니다! 먼저 학습을 실행하세요.")
        return

    # 모델 생성 및 가중치 로드
    model = models.resnet18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, 8)

    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model = model.to(device)
    model.eval()

    print(f"✅ 모델 로드 완료")
    print(f"  학습 에폭: {checkpoint['epoch']}")
    print(f"  학습 정확도: {checkpoint['train_accuracy']:.2f}%")
    print(f"  검증 정확도: {checkpoint['val_accuracy']:.2f}%")

    # 데이터 준비
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    dataset = BeeDataset(f'/content/bee_project/crops/{data_split}', transform)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=2)

    # 검증 실행
    correct = 0
    total = 0
    class_correct = [0] * 8
    class_total = [0] * 8

    print(f"\n📊 {data_split} 데이터 검증 중...")

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = outputs.max(1)

            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            # 클래스별 정확도
            for i in range(labels.size(0)):
                label = labels[i].item()
                class_total[label] += 1
                if predicted[i] == labels[i]:
                    class_correct[label] += 1

    # 전체 정확도
    accuracy = 100. * correct / total
    print(f"\n✅ 전체 정확도: {accuracy:.2f}% ({correct}/{total})")

    # 클래스별 정확도
    print(f"\n📋 클래스별 정확도:")
    for i, class_name in enumerate(CLASS_NAMES):
        if class_total[i] > 0:
            class_acc = 100. * class_correct[i] / class_total[i]
            print(f"  {class_name}: {class_acc:.2f}% ({class_correct[i]}/{class_total[i]})")
        else:
            print(f"  {class_name}: 샘플 없음")

    return accuracy

# ============================================
# 6. 메인 실행 코드
# ============================================

if __name__ == "__main__":
    print("🐝 ResNet 벌 분류 학습 시스템")
    print("=" * 60)

    # Step 1: 크롭 이미지 생성 (처음 한 번만)
    # create_crop_images()

    # Step 2: 학습 실행 (파라미터 조정 가능)
    # train_resnet(
    #     epochs=30,
    #     batch_size=32,
    #     learning_rate=0.001,
    #     weight_decay=0.0001,
    #     use_pretrained=True,
    #     use_augmentation=True
    # )

    # Step 3: 최고 모델로 검증
    # validate_best_model('val')   # validation 데이터
    # validate_best_model('train') # training 데이터

    print("\n📝 사용법:")
    print("1. create_crop_images()  # 크롭 생성 (처음 한 번)")
    print("2. train_resnet(epochs=30, learning_rate=0.001)  # 학습")
    print("3. validate_best_model('val')  # 검증")

In [None]:
create_crop_images()
train_resnet(epochs=30, learning_rate=0.001)

In [None]:
"""
ResNet 벌 분류 테스트 코드
학습된 모델로 테스트할 때 다양한 임계값 조정 가능
"""

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
import numpy as np
from PIL import Image
import json

# ============================================
# 기본 설정
# ============================================

CLASS_NAMES = [
    "AB_LI", "QB_LI",
    "AB_CA", "QB_CA",
    "AB_BI", "QB_BI",
    "AB_AP", "QB_AP"
]

CLASS_MAPPING = {
    "AB_LI": 0, "QB_LI": 1, "AB_CA": 2, "QB_CA": 3,
    "AB_BI": 4, "QB_BI": 5, "AB_AP": 6, "QB_AP": 7
}

# ============================================
# 데이터셋 클래스
# ============================================

class BeeTestDataset(Dataset):
    """테스트용 벌 데이터셋"""

    def __init__(self, root_dir, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.samples = []

        # 각 클래스 폴더에서 이미지 수집
        for class_id, class_name in enumerate(CLASS_NAMES):
            class_dir = self.root_dir / class_name
            if class_dir.exists():
                for img_path in class_dir.glob('*.jpg'):
                    self.samples.append({
                        'path': str(img_path),
                        'label': class_id,
                        'class_name': class_name
                    })

        print(f"📁 테스트 데이터: {len(self.samples)}개 이미지")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        image = Image.open(sample['path']).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, sample['label'], sample['path']

# ============================================
# ResNet 테스트 클래스
# ============================================

class ResNetTester:
    """ResNet 테스트 클래스 (다양한 임계값 조정 가능)"""

    def __init__(self, model_path='/content/bee_project/outputs/weights/resnet18_best.pt'):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model_path = Path(model_path)

        # 모델 로드
        self.model = self._load_model()

        # 전처리
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        print(f"✅ 테스터 준비 완료 (Device: {self.device})")

    def _load_model(self):
        """학습된 모델 로드"""
        if not self.model_path.exists():
            raise FileNotFoundError(f"모델 파일이 없습니다: {self.model_path}")

        # 모델 생성
        model = models.resnet18(pretrained=False)
        model.fc = nn.Linear(model.fc.in_features, 8)

        # 가중치 로드
        checkpoint = torch.load(self.model_path, map_location=self.device)
        model.load_state_dict(checkpoint['model_state_dict'])
        model = model.to(self.device)
        model.eval()

        print(f"📦 모델 로드: {self.model_path}")
        print(f"  학습 정확도: {checkpoint.get('train_accuracy', 'N/A'):.2f}%")
        print(f"  검증 정확도: {checkpoint.get('val_accuracy', 'N/A'):.2f}%")

        return model

    def test_with_confidence_threshold(
        self,
        data_dir='/content/bee_project/crops/val',
        confidence_threshold=0.5,
        reject_uncertain=True
    ):
        """
        신뢰도 임계값으로 테스트

        Args:
            data_dir: 테스트 데이터 경로
            confidence_threshold: 최소 신뢰도 (이 값보다 낮으면 "불확실"로 처리)
            reject_uncertain: True면 불확실한 샘플 제외, False면 포함
        """

        print("\n" + "="*60)
        print(f"🔍 신뢰도 임계값 테스트")
        print(f"  임계값: {confidence_threshold:.2%}")
        print(f"  불확실 샘플: {'제외' if reject_uncertain else '포함'}")
        print("="*60)

        # 데이터 로드
        dataset = BeeTestDataset(data_dir, self.transform)
        dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

        # 테스트 변수
        total = 0
        correct = 0
        uncertain = 0
        predictions = []

        with torch.no_grad():
            for images, labels, paths in dataloader:
                images = images.to(self.device)
                labels = labels.to(self.device)

                # 추론
                outputs = self.model(images)
                probs = torch.softmax(outputs, dim=1)
                confidences, predicted = probs.max(1)

                # 각 샘플별로 처리
                for i in range(len(labels)):
                    conf = confidences[i].item()
                    pred = predicted[i].item()
                    true_label = labels[i].item()

                    # 신뢰도 체크
                    if conf < confidence_threshold:
                        uncertain += 1
                        if reject_uncertain:
                            continue  # 불확실한 샘플 제외
                        else:
                            # 불확실해도 예측은 수행
                            total += 1
                            if pred == true_label:
                                correct += 1
                    else:
                        # 확실한 샘플
                        total += 1
                        if pred == true_label:
                            correct += 1

                    predictions.append({
                        'path': paths[i],
                        'true': CLASS_NAMES[true_label],
                        'pred': CLASS_NAMES[pred],
                        'confidence': conf,
                        'correct': pred == true_label,
                        'uncertain': conf < confidence_threshold
                    })

        # 결과 출력
        accuracy = 100.0 * correct / total if total > 0 else 0

        print(f"\n📊 결과:")
        print(f"  전체 샘플: {len(dataset)}")
        print(f"  확실한 샘플: {total} ({total/len(dataset)*100:.1f}%)")
        print(f"  불확실한 샘플: {uncertain} ({uncertain/len(dataset)*100:.1f}%)")
        print(f"  정확도: {accuracy:.2f}% ({correct}/{total})")

        # 오분류 샘플 출력
        errors = [p for p in predictions if not p['correct'] and not p['uncertain']]
        if errors and len(errors) <= 10:
            print(f"\n❌ 오분류 샘플 (최대 10개):")
            for err in errors[:10]:
                print(f"  {Path(err['path']).name}: {err['true']} → {err['pred']} (신뢰도: {err['confidence']:.2%})")

        return predictions, accuracy

    def test_with_top_k(
        self,
        data_dir='/content/bee_project/crops/val',
        top_k=3,
        min_confidence_gap=0.1
    ):
        """
        Top-K 정확도 테스트 (상위 K개 예측 중 정답이 있는지)

        Args:
            data_dir: 테스트 데이터 경로
            top_k: 상위 몇 개까지 볼지
            min_confidence_gap: 1위와 2위 신뢰도 차이 최소값
        """

        print("\n" + "="*60)
        print(f"🔍 Top-{top_k} 정확도 테스트")
        print(f"  신뢰도 차이 임계값: {min_confidence_gap:.2%}")
        print("="*60)

        dataset = BeeTestDataset(data_dir, self.transform)
        dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

        top1_correct = 0
        topk_correct = 0
        low_gap_samples = 0
        total = 0

        with torch.no_grad():
            for images, labels, paths in dataloader:
                images = images.to(self.device)
                labels = labels.to(self.device)

                outputs = self.model(images)
                probs = torch.softmax(outputs, dim=1)

                # Top-K 예측
                topk_probs, topk_indices = probs.topk(min(top_k, 8), dim=1)

                for i in range(len(labels)):
                    total += 1
                    true_label = labels[i].item()

                    # Top-1 정확도
                    if topk_indices[i][0].item() == true_label:
                        top1_correct += 1

                    # Top-K 정확도
                    if true_label in topk_indices[i].tolist():
                        topk_correct += 1

                    # 신뢰도 차이 체크
                    if len(topk_probs[i]) >= 2:
                        gap = topk_probs[i][0].item() - topk_probs[i][1].item()
                        if gap < min_confidence_gap:
                            low_gap_samples += 1

        # 결과 출력
        top1_acc = 100.0 * top1_correct / total
        topk_acc = 100.0 * topk_correct / total

        print(f"\n📊 결과:")
        print(f"  Top-1 정확도: {top1_acc:.2f}% ({top1_correct}/{total})")
        print(f"  Top-{top_k} 정확도: {topk_acc:.2f}% ({topk_correct}/{total})")
        print(f"  신뢰도 차이 낮은 샘플: {low_gap_samples} ({low_gap_samples/total*100:.1f}%)")

        return top1_acc, topk_acc

    def test_with_class_specific_thresholds(
        self,
        data_dir='/content/bee_project/crops/val',
        class_thresholds=None
    ):
        """
        클래스별 다른 임계값으로 테스트

        Args:
            data_dir: 테스트 데이터 경로
            class_thresholds: 클래스별 임계값 딕셔너리
                             예: {0: 0.7, 1: 0.8, ...}
        """

        # 기본 임계값 설정
        if class_thresholds is None:
            class_thresholds = {i: 0.5 for i in range(8)}

        print("\n" + "="*60)
        print(f"🔍 클래스별 임계값 테스트")
        for i, name in enumerate(CLASS_NAMES):
            if i in class_thresholds:
                print(f"  {name}: {class_thresholds[i]:.2%}")
        print("="*60)

        dataset = BeeTestDataset(data_dir, self.transform)
        dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

        class_correct = [0] * 8
        class_total = [0] * 8
        class_rejected = [0] * 8

        with torch.no_grad():
            for images, labels, paths in dataloader:
                images = images.to(self.device)
                labels = labels.to(self.device)

                outputs = self.model(images)
                probs = torch.softmax(outputs, dim=1)
                confidences, predicted = probs.max(1)

                for i in range(len(labels)):
                    true_label = labels[i].item()
                    pred_label = predicted[i].item()
                    confidence = confidences[i].item()

                    # 클래스별 임계값 체크
                    threshold = class_thresholds.get(pred_label, 0.5)

                    class_total[true_label] += 1

                    if confidence >= threshold:
                        if pred_label == true_label:
                            class_correct[true_label] += 1
                    else:
                        class_rejected[true_label] += 1

        # 결과 출력
        print(f"\n📊 클래스별 결과:")
        total_correct = sum(class_correct)
        total_samples = sum(class_total)

        for i, name in enumerate(CLASS_NAMES):
            if class_total[i] > 0:
                acc = 100.0 * class_correct[i] / class_total[i]
                rej_rate = 100.0 * class_rejected[i] / class_total[i]
                print(f"  {name:15}: 정확도={acc:6.2f}%, 거부율={rej_rate:5.1f}% ({class_correct[i]}/{class_total[i]})")

        overall_acc = 100.0 * total_correct / total_samples if total_samples > 0 else 0
        print(f"\n  전체 정확도: {overall_acc:.2f}%")

        return overall_acc

    def test_single_image(
        self,
        image_path,
        confidence_threshold=0.5,
        show_top_k=3
    ):
        """
        단일 이미지 테스트

        Args:
            image_path: 이미지 경로
            confidence_threshold: 신뢰도 임계값
            show_top_k: 상위 몇 개 예측을 보여줄지
        """

        print("\n" + "="*60)
        print(f"🖼️ 단일 이미지 테스트: {Path(image_path).name}")
        print("="*60)

        # 이미지 로드 및 전처리
        image = Image.open(image_path).convert('RGB')
        img_tensor = self.transform(image).unsqueeze(0).to(self.device)

        # 추론
        with torch.no_grad():
            outputs = self.model(img_tensor)
            probs = torch.softmax(outputs, dim=1)

            # Top-K 예측
            topk_probs, topk_indices = probs.topk(min(show_top_k, 8), dim=1)

        # 결과 출력
        top1_prob = topk_probs[0][0].item()
        top1_class = topk_indices[0][0].item()

        print(f"\n🐝 예측 결과:")

        if top1_prob >= confidence_threshold:
            print(f"  ✅ 예측: {CLASS_NAMES[top1_class]}")
            print(f"  신뢰도: {top1_prob:.2%}")
        else:
            print(f"  ⚠️ 불확실 (신뢰도 {top1_prob:.2%} < 임계값 {confidence_threshold:.2%})")
            print(f"  최고 예측: {CLASS_NAMES[top1_class]}")

        print(f"\n📊 Top-{show_top_k} 예측:")
        for i in range(len(topk_indices[0])):
            idx = topk_indices[0][i].item()
            prob = topk_probs[0][i].item()
            print(f"  {i+1}. {CLASS_NAMES[idx]:15}: {prob:7.2%}")

        # 신뢰도 차이
        if len(topk_probs[0]) >= 2:
            gap = topk_probs[0][0].item() - topk_probs[0][1].item()
            print(f"\n  1위-2위 신뢰도 차이: {gap:.2%}")

        return CLASS_NAMES[top1_class], top1_prob


# ============================================
# 메인
# ============================================

if __name__ == "__main__":
    print("🐝 ResNet 테스트 시스템")
    print("="*60)
    print("\n📝 사용법:")
    print("\n1. 빠른 테스트:")
    print("   quick_test(confidence_threshold=0.7)")
    print("\n2. 다양한 임계값 실험:")
    print("   test_with_various_thresholds()")
    print("\n3. 단일 이미지 테스트:")
    print("   tester = ResNetTester()")
    print("   tester.test_single_image('image.jpg', confidence_threshold=0.8)")


In [None]:
tester = ResNetTester()
tester.test_single_image('01_1_R_QB_AP_20220802_07_0208.jpg', confidence_threshold=0.8)

In [None]:
"""
테스트 이미지로 벌 종류 판별 및 시각화
YOLO + ResNet을 사용하여 test 폴더의 이미지를 분석합니다.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.ops import nms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from pathlib import Path
import random

# ============================================
# 설정
# ============================================

CLASS_NAMES = [
    "AB_LI", "QB_LI",
    "AB_CA", "QB_CA",
    "AB_BI", "QB_BI",
    "AB_AP", "QB_AP"
]

# 클래스별 색상 (시각화용)
CLASS_COLORS = [
    '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4',
    '#FFEAA7', '#DDA0DD', '#98D8C8', '#F7DC6F'
]

# ============================================
# YOLOv3 모델 정의 (필요한 부분만)
# ============================================

class Conv(nn.Module):
    def __init__(self, c1, c2, k=1, s=1, p=None, act=True):
        super().__init__()
        p = k // 2 if p is None else p
        self.conv = nn.Conv2d(c1, c2, k, s, p, bias=False)
        self.bn = nn.BatchNorm2d(c2)
        self.act = nn.LeakyReLU(0.1, inplace=True) if act else nn.Identity()
    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

class ResBlock(nn.Module):
    def __init__(self, c):
        super().__init__()
        self.cv1 = Conv(c, c//2, k=1, s=1)
        self.cv2 = Conv(c//2, c, k=3, s=1)
    def forward(self, x):
        return x + self.cv2(self.cv1(x))

class Darknet53(nn.Module):
    def __init__(self):
        super().__init__()
        self.cv1 = Conv(3, 32, 3, 1)
        self.cv2 = Conv(32, 64, 3, 2)
        self.res1 = nn.Sequential(*[ResBlock(64) for _ in range(1)])
        self.cv3 = Conv(64, 128, 3, 2)
        self.res2 = nn.Sequential(*[ResBlock(128) for _ in range(2)])
        self.cv4 = Conv(128, 256, 3, 2)
        self.res3 = nn.Sequential(*[ResBlock(256) for _ in range(8)])
        self.cv5 = Conv(256, 512, 3, 2)
        self.res4 = nn.Sequential(*[ResBlock(512) for _ in range(8)])
        self.cv6 = Conv(512, 1024, 3, 2)
        self.res5 = nn.Sequential(*[ResBlock(1024) for _ in range(4)])
    def forward(self, x):
        x = self.cv1(x)
        x = self.cv2(x) ; x = self.res1(x)
        x = self.cv3(x) ; x = self.res2(x)
        x = self.cv4(x) ; x3 = self.res3(x)
        x = self.cv5(x) ; x2 = self.res4(x)
        x = self.cv6(x) ; x1 = self.res5(x)
        return x1, x2, x3

class YOLOv3(nn.Module):
    def __init__(self, num_classes=1, anchors=None, img_size=416):
        super().__init__()
        self.nc = num_classes
        self.na = 3
        self.anchors = anchors or [
            [(116,90), (156,198), (373,326)],
            [(30,61), (62,45), (59,119)],
            [(10,13), (16,30), (33,23)]
        ]
        self.img_size = img_size
        self.backbone = Darknet53()

        # Heads
        self.head1 = nn.Sequential(Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1))
        self.pred1 = nn.Conv2d(512, self.na*(5+self.nc), 1, 1, 0)

        self.up1 = nn.Upsample(scale_factor=2, mode='nearest')
        self.reduce1 = Conv(512, 256, 1, 1)

        self.head2 = nn.Sequential(Conv(768, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1))
        self.pred2 = nn.Conv2d(256, self.na*(5+self.nc), 1, 1, 0)

        self.up2 = nn.Upsample(scale_factor=2, mode='nearest')
        self.reduce2 = Conv(256, 128, 1, 1)

        self.head3 = nn.Sequential(Conv(384, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1))
        self.pred3 = nn.Conv2d(128, self.na*(5+self.nc), 1, 1, 0)

    def forward(self, x):
        x1, x2, x3 = self.backbone(x)

        p1 = self.head1(x1)
        out1 = self.pred1(p1)

        u1 = self.up1(self.reduce1(p1))
        f2 = torch.cat([u1, x2], dim=1)
        p2 = self.head2(f2)
        out2 = self.pred2(p2)

        u2 = self.up2(self.reduce2(p2))
        f3 = torch.cat([u2, x3], dim=1)
        p3 = self.head3(f3)
        out3 = self.pred3(p3)

        return [out1, out2, out3]

# ============================================
# YOLO 후처리 함수
# ============================================

def postprocess(outputs, conf_thr=0.25, iou_thr=0.5, img_size=416, num_classes=1, anchors=None):
    """YOLO 출력 후처리"""
    if anchors is None:
        anchors = [
            [(116,90), (156,198), (373,326)],
            [(30,61), (62,45), (59,119)],
            [(10,13), (16,30), (33,23)]
        ]

    device = outputs[0].device
    strides = [32, 16, 8]
    decoded = []

    for i, out in enumerate(outputs):
        bs, ch, ny, nx = out.shape
        na = 3
        no = 5 + num_classes

        out = out.view(bs, na, no, ny, nx).permute(0,1,3,4,2).contiguous()

        stride = strides[i]
        anc = torch.tensor(anchors[i], device=device).float() / stride

        xv, yv = torch.meshgrid(torch.arange(nx, device=device),
                                torch.arange(ny, device=device), indexing='xy')

        x = (out[..., 0].sigmoid() + xv) * stride
        y = (out[..., 1].sigmoid() + yv) * stride
        w = (out[..., 2].exp() * anc[:,0].view(na,1,1)) * stride
        h = (out[..., 3].exp() * anc[:,1].view(na,1,1)) * stride
        obj = out[..., 4].sigmoid()

        boxes = torch.stack([x - w/2, y - h/2, x + w/2, y + h/2], dim=-1)
        boxes = boxes.view(bs, -1, 4)
        obj = obj.view(bs, -1)

        decoded.append((boxes, obj))

    boxes = torch.cat([d[0] for d in decoded], dim=1)
    obj = torch.cat([d[1] for d in decoded], dim=1)

    results = []
    for b in range(boxes.size(0)):
        scores = obj[b]
        keep = scores > conf_thr
        bxs = boxes[b][keep]
        scs = scores[keep]

        if bxs.numel()==0:
            results.append((torch.zeros((0,4)), torch.zeros((0,)), torch.zeros((0,), dtype=torch.long)))
            continue

        keep_idx = nms(bxs, scs, iou_thr)
        results.append((bxs[keep_idx], scs[keep_idx], torch.zeros(len(keep_idx), dtype=torch.long)))

    return results

# ============================================
# 벌 판별 시스템
# ============================================

class BeeDetector:
    """YOLO + ResNet 통합 벌 판별 시스템"""

    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"🖥️ Device: {self.device}")

        # 모델 로드
        self.yolo_model = self.load_yolo()
        self.resnet_model = self.load_resnet()

        # ResNet 전처리
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        print("✅ 시스템 준비 완료!\n")

    def load_yolo(self):
        """YOLO 모델 로드"""
        model_path = Path('/content/bee_project/outputs/weights/yolov3_best.pt')

        # YOLOv3 모델 생성
        model = YOLOv3(num_classes=1, img_size=416).to(self.device)

        if model_path.exists():
            checkpoint = torch.load(model_path, map_location=self.device)
            model.load_state_dict(checkpoint['model'], strict=False)
            print(f"✅ YOLO 로드: mAP={checkpoint.get('map50', 0):.3f}")
        else:
            print(f"❌ YOLO 가중치 없음: {model_path}")

        model.eval()
        return model

    def load_resnet(self):
        """ResNet 모델 로드"""
        model_path = Path('/content/bee_project/outputs/weights/resnet18_best.pt')

        model = models.resnet18(pretrained=False)
        model.fc = nn.Linear(model.fc.in_features, 8)

        if model_path.exists():
            checkpoint = torch.load(model_path, map_location=self.device)
            model.load_state_dict(checkpoint['model_state_dict'])
            print(f"✅ ResNet 로드: Acc={checkpoint.get('val_accuracy', 0):.1f}%")
        else:
            print(f"❌ ResNet 가중치 없음: {model_path}")

        model = model.to(self.device)
        model.eval()
        return model

    def detect_bees(self, image, conf_thr=0.3):
        """YOLO로 벌 탐지"""
        h_orig, w_orig = image.shape[:2]

        # 전처리 (416x416)
        img_resized = Image.fromarray(image).resize((416, 416))
        img_tensor = transforms.ToTensor()(img_resized).unsqueeze(0).to(self.device)

        # YOLO 추론
        with torch.no_grad():
            outputs = self.yolo_model(img_tensor)
            results = postprocess(outputs, conf_thr, 0.5, 416, 1, self.yolo_model.anchors)
            boxes, scores, _ = results[0]

        # 원본 크기로 좌표 변환
        detections = []
        for box, score in zip(boxes, scores):
            x1 = int(box[0] * w_orig / 416)
            y1 = int(box[1] * h_orig / 416)
            x2 = int(box[2] * w_orig / 416)
            y2 = int(box[3] * h_orig / 416)

            # 경계 체크
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w_orig, x2), min(h_orig, y2)

            if x2 > x1 and y2 > y1:
                detections.append({
                    'bbox': [x1, y1, x2, y2],
                    'score': float(score)
                })

        return detections

    def classify_bee(self, crop):
        """ResNet으로 벌 종류 분류"""
        crop_pil = Image.fromarray(crop)
        img_tensor = self.transform(crop_pil).unsqueeze(0).to(self.device)

        with torch.no_grad():
            outputs = self.resnet_model(img_tensor)
            probs = torch.softmax(outputs, dim=1)
            confidence, predicted = probs.max(1)

        return {
            'class_id': predicted.item(),
            'class_name': CLASS_NAMES[predicted.item()],
            'confidence': confidence.item()
        }

    def process_image(self, image_path):
        """이미지 처리 (탐지 + 분류)"""
        # 이미지 로드
        image = Image.open(image_path).convert('RGB')
        img_array = np.array(image)

        # 1. YOLO 탐지
        detections = self.detect_bees(img_array)

        # 2. 각 탐지에 대해 분류
        results = []
        for det in detections:
            x1, y1, x2, y2 = det['bbox']
            crop = img_array[y1:y2, x1:x2]

            # ResNet 분류
            species = self.classify_bee(crop)

            results.append({
                'bbox': det['bbox'],
                'detection_conf': det['score'],
                'species': species['class_name'],
                'species_conf': species['confidence'],
                'color': CLASS_COLORS[species['class_id']]
            })

        return img_array, results

# ============================================
# 테스트 및 시각화 함수
# ============================================

def test_and_visualize(num_samples=5):
    """
    테스트 이미지 샘플링하여 판별 및 시각화

    Args:
        num_samples: 테스트할 이미지 개수
    """

    print("="*60)
    print(f"🧪 테스트 시작 (샘플: {num_samples}개)")
    print("="*60)

    # 테스트 이미지 경로
    test_dir = Path('/content/bee_project/images/test')
    if not test_dir.exists():
        print(f"❌ 테스트 폴더 없음: {test_dir}")
        return

    # 이미지 파일 수집
    test_images = list(test_dir.glob('*.jpg')) + list(test_dir.glob('*.png'))
    if len(test_images) == 0:
        print("❌ 테스트 이미지가 없습니다!")
        return

    # 랜덤 샘플링
    samples = random.sample(test_images, min(num_samples, len(test_images)))
    print(f"📁 전체 {len(test_images)}개 중 {len(samples)}개 샘플링\n")

    # 판별 시스템 초기화
    detector = BeeDetector()

    # 결과 저장
    all_results = []

    # 각 이미지 처리
    for i, img_path in enumerate(samples):
        print(f"🖼️ [{i+1}/{len(samples)}] {img_path.name}")

        # 처리
        image, results = detector.process_image(img_path)

        # 결과 출력
        if results:
            print(f"  ✅ {len(results)}마리 탐지:")
            for j, r in enumerate(results):
                print(f"     {j+1}. {r['species']} (신뢰도: {r['species_conf']:.2%})")
        else:
            print(f"  ❌ 탐지된 벌 없음")

        all_results.append({
            'path': img_path,
            'image': image,
            'results': results
        })
        print()

    # 시각화
    visualize_results(all_results)

    return all_results

def visualize_results(all_results):
    """결과 시각화"""

    n = len(all_results)
    if n == 0:
        return

    # 그리드 크기 결정
    cols = min(3, n)
    rows = (n + cols - 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=(cols*6, rows*5))
    if n == 1:
        axes = [axes]
    elif rows == 1:
        axes = axes
    else:
        axes = axes.flatten()

    for idx, data in enumerate(all_results):
        ax = axes[idx] if n > 1 else axes[0]

        # 이미지 표시
        ax.imshow(data['image'])
        ax.set_title(f"{data['path'].name}\n탐지: {len(data['results'])}마리",
                    fontsize=10, fontweight='bold')
        ax.axis('off')

        # 바운딩 박스 그리기
        for result in data['results']:
            x1, y1, x2, y2 = result['bbox']

            # 색상 변환
            color = result['color']
            if color.startswith('#'):
                color = tuple(int(color[i:i+2], 16)/255 for i in (1, 3, 5))

            # 박스
            rect = patches.Rectangle((x1, y1), x2-x1, y2-y1,
                                    linewidth=2, edgecolor=color,
                                    facecolor='none')
            ax.add_patch(rect)

            # 라벨
            label = f"{result['species']}\n{result['species_conf']:.1%}"
            ax.text(x1, y1-5, label, color=color, fontsize=8,
                   fontweight='bold',
                   bbox=dict(boxstyle='round,pad=0.3',
                            facecolor='white', alpha=0.8))

    # 빈 subplot 숨기기
    for idx in range(n, len(axes)):
        axes[idx].axis('off')

    plt.suptitle('🐝 벌 종류 판별 결과', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

def quick_test(num=3):
    """빠른 테스트 실행"""
    return test_and_visualize(num)

def quick_test_fixed(num=3):
    """수정된 테스트 (높은 임계값)"""

    # 기존 detector의 detect_bees 메서드 오버라이드
    detector = BeeDetector()

    # 신뢰도 임계값 상향
    original_detect = detector.detect_bees
    def detect_bees_strict(image):
        return original_detect(image, conf_thr=0.7)  # 0.3 → 0.7
    detector.detect_bees = detect_bees_strict

    # 테스트 실행
    return test_and_visualize(num)

def filter_detections(detections):
    """너무 작거나 큰 박스 제거"""
    filtered = []
    for det in detections:
        x1, y1, x2, y2 = det['bbox']
        w = x2 - x1
        h = y2 - y1

        # 크기 필터 (너무 작거나 큰 박스 제거)
        if 30 < w < 300 and 30 < h < 300:
            # 종횡비 필터 (너무 길쭉한 박스 제거)
            aspect_ratio = w / h
            if 0.3 < aspect_ratio < 3.0:
                filtered.append(det)

    return filtered

def test_improved(num_samples=5):
    """개선된 테스트 함수"""

    print("🔧 개선된 테스트 시작")
    print("="*60)

    test_dir = Path('/content/bee_project/images/test')
    test_images = list(test_dir.glob('*.jpg'))
    samples = random.sample(test_images, min(num_samples, len(test_images)))

    detector = BeeDetector()
    all_results = []

    for i, img_path in enumerate(samples):
        print(f"🖼️ [{i+1}/{len(samples)}] {img_path.name}")

        image = Image.open(img_path).convert('RGB')
        img_array = np.array(image)

        # 1. YOLO 탐지 (높은 임계값)
        detections = detector.detect_bees(img_array, conf_thr=0.7)

        # 2. 크기 필터링
        detections = filter_detections(detections)

        # 3. 상위 N개만 선택
        detections = sorted(detections, key=lambda x: x['score'], reverse=True)[:10]

        # 4. 분류
        results = []
        for det in detections:
            x1, y1, x2, y2 = det['bbox']

            # 여백 추가한 크롭
            margin = 0.1
            w, h = x2 - x1, y2 - y1
            x1_new = max(0, int(x1 - w * margin))
            y1_new = max(0, int(y1 - h * margin))
            x2_new = min(img_array.shape[1], int(x2 + w * margin))
            y2_new = min(img_array.shape[0], int(y2 + h * margin))

            crop = img_array[y1_new:y2_new, x1_new:x2_new]

            if crop.size > 0:
                species = detector.classify_bee(crop)

                # 분류 신뢰도도 체크
                if species['confidence'] > 0.98:
                    results.append({
                        'bbox': [x1, y1, x2, y2],
                        'detection_conf': det['score'],
                        'species': species['class_name'],
                        'species_conf': species['confidence'],
                        'color': CLASS_COLORS[species['class_id']]
                    })

        print(f"  ✅ {len(results)}마리 확정 (필터링 후)")

        all_results.append({
            'path': img_path,
            'image': img_array,
            'results': results
        })

    visualize_results(all_results)
    return all_results



# ============================================
# 통계 분석
# ============================================

def analyze_test_results(num_samples=20):
    """테스트 결과 통계 분석"""

    print("📊 테스트 통계 분석")
    print("="*60)

    detector = BeeDetector()
    test_dir = Path('/content/bee_project/images/test')
    test_images = list(test_dir.glob('*.jpg'))

    if not test_images:
        print("❌ 테스트 이미지가 없습니다!")
        return

    samples = random.sample(test_images, min(num_samples, len(test_images)))

    # 통계 변수
    total_images = len(samples)
    total_detected = 0
    class_counts = {name: 0 for name in CLASS_NAMES}
    confidence_scores = []

    for img_path in samples:
        _, results = detector.process_image(img_path)
        total_detected += len(results)

        for r in results:
            class_counts[r['species']] += 1
            confidence_scores.append(r['species_conf'])

    # 결과 출력
    print(f"\n📈 결과:")
    print(f"  테스트 이미지: {total_images}개")
    print(f"  탐지된 벌: {total_detected}마리")
    print(f"  평균 탐지: {total_detected/total_images:.1f}마리/이미지")

    if confidence_scores:
        print(f"  평균 신뢰도: {np.mean(confidence_scores):.2%}")
        print(f"  최소 신뢰도: {min(confidence_scores):.2%}")
        print(f"  최대 신뢰도: {max(confidence_scores):.2%}")

    print(f"\n📋 클래스별 탐지:")
    for name, count in class_counts.items():
        if count > 0:
            print(f"  {name}: {count}마리")

    return class_counts

# ============================================
# 메인
# ============================================

if __name__ == "__main__":
    print("🐝 벌 종류 판별 테스트 시스템")
    print("="*60)
    print("\n사용법:")
    print("  quick_test(3)        # 3개 샘플 테스트")
    print("  quick_test(10)       # 10개 샘플 테스트")
    print("  analyze_test_results(20)  # 20개로 통계 분석")
    print("="*60)

In [None]:
# 실행
test_improved(5)

In [None]:
quick_test(5)

In [None]:
"""
테스트 이미지로 벌 종류 판별 및 시각화 (군집화 개선 버전)
YOLO + ResNet을 사용하여 test 폴더의 이미지를 분석합니다.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.ops import nms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from pathlib import Path
import random

# ============================================
# 설정
# ============================================

CLASS_NAMES = [
    "AB_LI", "QB_LI",
    "AB_CA", "QB_CA",
    "AB_BI", "QB_BI",
    "AB_AP", "QB_AP"
]

# 클래스별 색상 (시각화용)
CLASS_COLORS = [
    '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4',
    '#FFEAA7', '#DDA0DD', '#98D8C8', '#F7DC6F'
]

# ============================================
# YOLOv3 모델 정의 (필요한 부분만)
# ============================================

class Conv(nn.Module):
    def __init__(self, c1, c2, k=1, s=1, p=None, act=True):
        super().__init__()
        p = k // 2 if p is None else p
        self.conv = nn.Conv2d(c1, c2, k, s, p, bias=False)
        self.bn = nn.BatchNorm2d(c2)
        self.act = nn.LeakyReLU(0.1, inplace=True) if act else nn.Identity()
    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

class ResBlock(nn.Module):
    def __init__(self, c):
        super().__init__()
        self.cv1 = Conv(c, c//2, k=1, s=1)
        self.cv2 = Conv(c//2, c, k=3, s=1)
    def forward(self, x):
        return x + self.cv2(self.cv1(x))

class Darknet53(nn.Module):
    def __init__(self):
        super().__init__()
        self.cv1 = Conv(3, 32, 3, 1)
        self.cv2 = Conv(32, 64, 3, 2)
        self.res1 = nn.Sequential(*[ResBlock(64) for _ in range(1)])
        self.cv3 = Conv(64, 128, 3, 2)
        self.res2 = nn.Sequential(*[ResBlock(128) for _ in range(2)])
        self.cv4 = Conv(128, 256, 3, 2)
        self.res3 = nn.Sequential(*[ResBlock(256) for _ in range(8)])
        self.cv5 = Conv(256, 512, 3, 2)
        self.res4 = nn.Sequential(*[ResBlock(512) for _ in range(8)])
        self.cv6 = Conv(512, 1024, 3, 2)
        self.res5 = nn.Sequential(*[ResBlock(1024) for _ in range(4)])
    def forward(self, x):
        x = self.cv1(x)
        x = self.cv2(x) ; x = self.res1(x)
        x = self.cv3(x) ; x = self.res2(x)
        x = self.cv4(x) ; x3 = self.res3(x)
        x = self.cv5(x) ; x2 = self.res4(x)
        x = self.cv6(x) ; x1 = self.res5(x)
        return x1, x2, x3

class YOLOv3(nn.Module):
    def __init__(self, num_classes=1, anchors=None, img_size=416):
        super().__init__()
        self.nc = num_classes
        self.na = 3
        self.anchors = anchors or [
            [(116,90), (156,198), (373,326)],
            [(30,61), (62,45), (59,119)],
            [(10,13), (16,30), (33,23)]
        ]
        self.img_size = img_size
        self.backbone = Darknet53()

        # Heads
        self.head1 = nn.Sequential(Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1), Conv(512, 1024, 3, 1),
                                   Conv(1024, 512, 1, 1))
        self.pred1 = nn.Conv2d(512, self.na*(5+self.nc), 1, 1, 0)

        self.up1 = nn.Upsample(scale_factor=2, mode='nearest')
        self.reduce1 = Conv(512, 256, 1, 1)

        self.head2 = nn.Sequential(Conv(768, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1), Conv(256, 512, 3, 1),
                                   Conv(512, 256, 1, 1))
        self.pred2 = nn.Conv2d(256, self.na*(5+self.nc), 1, 1, 0)

        self.up2 = nn.Upsample(scale_factor=2, mode='nearest')
        self.reduce2 = Conv(256, 128, 1, 1)

        self.head3 = nn.Sequential(Conv(384, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1), Conv(128, 256, 3, 1),
                                   Conv(256, 128, 1, 1))
        self.pred3 = nn.Conv2d(128, self.na*(5+self.nc), 1, 1, 0)

    def forward(self, x):
        x1, x2, x3 = self.backbone(x)

        p1 = self.head1(x1)
        out1 = self.pred1(p1)

        u1 = self.up1(self.reduce1(p1))
        f2 = torch.cat([u1, x2], dim=1)
        p2 = self.head2(f2)
        out2 = self.pred2(p2)

        u2 = self.up2(self.reduce2(p2))
        f3 = torch.cat([u2, x3], dim=1)
        p3 = self.head3(f3)
        out3 = self.pred3(p3)

        return [out1, out2, out3]

# ============================================
# YOLO 후처리 함수
# ============================================

def postprocess(outputs, conf_thr=0.25, iou_thr=0.5, img_size=416, num_classes=1, anchors=None):
    """YOLO 출력 후처리"""
    if anchors is None:
        anchors = [
            [(116,90), (156,198), (373,326)],
            [(30,61), (62,45), (59,119)],
            [(10,13), (16,30), (33,23)]
        ]

    device = outputs[0].device
    strides = [32, 16, 8] # 앵커 적용 안해봄
    decoded = []

    for i, out in enumerate(outputs):
        bs, ch, ny, nx = out.shape
        na = 3
        no = 5 + num_classes

        out = out.view(bs, na, no, ny, nx).permute(0,1,3,4,2).contiguous()

        stride = strides[i]
        anc = torch.tensor(anchors[i], device=device).float() / stride

        xv, yv = torch.meshgrid(torch.arange(nx, device=device),
                                torch.arange(ny, device=device), indexing='xy')

        x = (out[..., 0].sigmoid() + xv) * stride
        y = (out[..., 1].sigmoid() + yv) * stride
        w = (out[..., 2].exp() * anc[:,0].view(na,1,1)) * stride
        h = (out[..., 3].exp() * anc[:,1].view(na,1,1)) * stride
        obj = out[..., 4].sigmoid()

        boxes = torch.stack([x - w/2, y - h/2, x + w/2, y + h/2], dim=-1)
        boxes = boxes.view(bs, -1, 4)
        obj = obj.view(bs, -1)

        decoded.append((boxes, obj))

    boxes = torch.cat([d[0] for d in decoded], dim=1)
    obj = torch.cat([d[1] for d in decoded], dim=1)

    results = []
    for b in range(boxes.size(0)):
        scores = obj[b]
        keep = scores > conf_thr
        bxs = boxes[b][keep]
        scs = scores[keep]

        if bxs.numel()==0:
            results.append((torch.zeros((0,4)), torch.zeros((0,)), torch.zeros((0,), dtype=torch.long)))
            continue

        keep_idx = nms(bxs, scs, iou_thr)
        results.append((bxs[keep_idx], scs[keep_idx], torch.zeros(len(keep_idx), dtype=torch.long)))

    return results

# ============================================
# 군집화 및 필터링 함수 (문서 참고하여 개선)
# ============================================

def calc_resize_pad_params(W0, H0, img_size=416):
    """원본 크기에서 리사이즈/패딩 파라미터 계산"""
    r = min(img_size / W0, img_size / H0)
    W1, H1 = int(round(W0*r)), int(round(H0*r))
    xpad = (img_size - W1) / 2.0
    ypad = (img_size - H1) / 2.0
    return r, xpad, ypad, W1, H1

def filter_by_valid_area(boxes416, W0, H0, use_pad_filter=True):
    """패딩 영역에 걸친 탐지 제거"""
    if not use_pad_filter or len(boxes416)==0:
        return boxes416, np.ones(len(boxes416), bool)

    _, xpad, ypad, W1, H1 = calc_resize_pad_params(W0, H0, 416)
    x1v, y1v, x2v, y2v = xpad, ypad, xpad+W1, ypad+H1

    # 중심점이 유효 영역에 있는지 확인
    cx = (boxes416[:,0]+boxes416[:,2]) * 0.5
    cy = (boxes416[:,1]+boxes416[:,3]) * 0.5
    keep = (cx>=x1v) & (cx<=x2v) & (cy>=y1v) & (cy<=y2v)

    return boxes416[keep], keep

def filter_by_size(boxes416, scores, min_wh=12, max_frac=0.90):
    """너무 작거나 큰 박스 제거"""
    if len(boxes416)==0:
        return boxes416, scores

    w = np.clip(boxes416[:,2]-boxes416[:,0], 0, None)
    h = np.clip(boxes416[:,3]-boxes416[:,1], 0, None)
    keep = (w>=min_wh) & (h>=min_wh) & (w<=416*max_frac) & (h<=416*max_frac)

    return boxes416[keep], scores[keep]

def cluster_and_prune(boxes, scores, cluster_iou=0.3, max_clusters=10):
    """
    IoU > cluster_iou인 탐지들을 같은 군집으로 묶고, 각 군집에서 최고 점수만 남김
    """
    if len(boxes) == 0:
        return boxes, scores, cluster_iou

    # numpy 변환
    if isinstance(boxes, torch.Tensor):
        boxes = boxes.detach().cpu().numpy()
    if isinstance(scores, torch.Tensor):
        scores = scores.detach().cpu().numpy()

    x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3]
    area = np.maximum(0, x2-x1) * np.maximum(0, y2-y1)

    # 점수 순으로 정렬
    order = list(range(len(boxes)))
    order.sort(key=lambda i: scores[i], reverse=True)

    kept = []

    while order:
        # 가장 높은 점수를 시드로 선택
        i = order.pop(0)
        kept.append(i)

        # 시드와 IoU > threshold인 것들 제거 (같은 군집)
        next_order = []
        for j in order:
            xx1 = max(x1[i], x1[j])
            yy1 = max(y1[i], y1[j])
            xx2 = min(x2[i], x2[j])
            yy2 = min(y2[i], y2[j])

            inter = max(0, xx2-xx1) * max(0, yy2-yy1)
            iou = inter / (area[i] + area[j] - inter + 1e-9)

            if iou <= cluster_iou:  # 다른 군집이면 유지
                next_order.append(j)

        order = next_order

    # 최대 개수 제한
    kept = np.array(kept, dtype=int)[:max_clusters]

    return boxes[kept], scores[kept], cluster_iou

# ============================================
# 벌 판별 시스템
# ============================================

class BeeDetector:
    """YOLO + ResNet 통합 벌 판별 시스템"""

    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"🖥️ Device: {self.device}")

        # 모델 로드
        self.yolo_model = self.load_yolo()
        self.resnet_model = self.load_resnet()

        # ResNet 전처리
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        print("✅ 시스템 준비 완료!\n")

    def load_yolo(self):
        """YOLO 모델 로드"""
        model_path = Path('/content/bee_project/outputs/weights/yolov3_best.pt')

        # YOLOv3 모델 생성
        model = YOLOv3(num_classes=1, img_size=416).to(self.device)

        if model_path.exists():
            try:
                checkpoint = torch.load(model_path, map_location=self.device)
                model.load_state_dict(checkpoint['model'], strict=False)
                print(f"✅ YOLO 로드: mAP={checkpoint.get('map50', 0):.3f}")
            except Exception as e:
                print(f"⚠️ YOLO 가중치 로드 오류: {e}")
        else:
            print(f"❌ YOLO 가중치 없음: {model_path}")

        model.eval()
        return model

    def load_resnet(self):
        """ResNet 모델 로드"""
        model_path = Path('/content/bee_project/outputs/weights/resnet18_best.pt')

        model = models.resnet18(pretrained=False)
        model.fc = nn.Linear(model.fc.in_features, 8)

        if model_path.exists():
            try:
                checkpoint = torch.load(model_path, map_location=self.device)
                model.load_state_dict(checkpoint['model_state_dict'])
                print(f"✅ ResNet 로드: Acc={checkpoint.get('val_accuracy', 0):.1f}%")
            except Exception as e:
                print(f"⚠️ ResNet 가중치 로드 오류: {e}")
        else:
            print(f"❌ ResNet 가중치 없음: {model_path}")

        model = model.to(self.device)
        model.eval()
        return model

    def detect_bees(self, image, conf_thr=0.5):
        """YOLO로 벌 탐지 + 군집화"""
        h_orig, w_orig = image.shape[:2]

        # 전처리 (416x416)
        img_resized = Image.fromarray(image).resize((416, 416))
        img_tensor = transforms.ToTensor()(img_resized).unsqueeze(0).to(self.device)

        # YOLO 추론
        with torch.no_grad():
            outputs = self.yolo_model(img_tensor)
            results = postprocess(outputs, conf_thr, 0.5, 416, 1, self.yolo_model.anchors)
            boxes, scores, _ = results[0]

        # numpy 변환
        if isinstance(boxes, torch.Tensor):
            boxes = boxes.detach().cpu().numpy()
        if isinstance(scores, torch.Tensor):
            scores = scores.detach().cpu().numpy()

        if len(boxes) == 0:
            return []

        print(f"  📊 YOLO 원시 탐지: {len(boxes)}개")

        # 1. 패딩 영역 필터링
        boxes, keep_mask = filter_by_valid_area(boxes, w_orig, h_orig)
        scores = scores[keep_mask]
        print(f"  📊 패딩 필터 후: {len(boxes)}개")

        # 2. 크기 필터링
        boxes, scores = filter_by_size(boxes, scores, min_wh=12, max_frac=0.90)
        print(f"  📊 크기 필터 후: {len(boxes)}개")

        # 3. 군집화 (핵심!)
        boxes, scores, used_iou = cluster_and_prune(
            boxes, scores,
            cluster_iou=0.3,  # IoU 임계값
            max_clusters=10   # 최대 군집 수
        )
        print(f"  📊 군집화 후: {len(boxes)}개 (IoU={used_iou:.2f})")

        # 원본 크기로 좌표 변환
        detections = []
        for box, score in zip(boxes, scores):
            x1 = int(box[0] * w_orig / 416)
            y1 = int(box[1] * h_orig / 416)
            x2 = int(box[2] * w_orig / 416)
            y2 = int(box[3] * h_orig / 416)

            # 경계 체크
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w_orig, x2), min(h_orig, y2)

            if x2 > x1 and y2 > y1:
                detections.append({
                    'bbox': [x1, y1, x2, y2],
                    'score': float(score)
                })

        return detections

    def classify_bee(self, crop):
        """ResNet으로 벌 종류 분류"""
        try:
            crop_pil = Image.fromarray(crop)
            img_tensor = self.transform(crop_pil).unsqueeze(0).to(self.device)

            with torch.no_grad():
                outputs = self.resnet_model(img_tensor)
                probs = torch.softmax(outputs, dim=1)
                confidence, predicted = probs.max(1)

            return {
                'class_id': predicted.item(),
                'class_name': CLASS_NAMES[predicted.item()],
                'confidence': confidence.item()
            }
        except Exception as e:
            print(f"⚠️ 분류 오류: {e}")
            return {
                'class_id': 0,
                'class_name': CLASS_NAMES[0],
                'confidence': 0.0
            }

    def process_image(self, image_path):
        """이미지 처리 (탐지 + 분류)"""
        # 이미지 로드
        image = Image.open(image_path).convert('RGB')
        img_array = np.array(image)

        print(f"🔍 처리 중: {image_path.name}")

        # 1. YOLO 탐지
        detections = self.detect_bees(img_array, conf_thr=0.5)

        # 2. 각 탐지에 대해 분류
        results = []
        for i, det in enumerate(detections):
            x1, y1, x2, y2 = det['bbox']

            # 여백 추가하여 크롭
            margin = 0.1
            w, h = x2 - x1, y2 - y1
            x1_margin = max(0, int(x1 - w * margin))
            y1_margin = max(0, int(y1 - h * margin))
            x2_margin = min(img_array.shape[1], int(x2 + w * margin))
            y2_margin = min(img_array.shape[0], int(y2 + h * margin))

            crop = img_array[y1_margin:y2_margin, x1_margin:x2_margin]

            if crop.size > 0:
                # ResNet 분류
                species = self.classify_bee(crop)

                results.append({
                    'bbox': det['bbox'],
                    'detection_conf': det['score'],
                    'species': species['class_name'],
                    'species_conf': species['confidence'],
                    'color': CLASS_COLORS[species['class_id']]
                })

                print(f"    {i+1}. {species['class_name']} (신뢰도: {species['confidence']:.2%})")

        return img_array, results

# ============================================
# 테스트 및 시각화 함수
# ============================================

def test_and_visualize(num_samples=5):
    """
    테스트 이미지 샘플링하여 판별 및 시각화

    Args:
        num_samples: 테스트할 이미지 개수
    """

    print("="*60)
    print(f"🧪 테스트 시작 (샘플: {num_samples}개)")
    print("="*60)

    # 테스트 이미지 경로
    test_dir = Path('/content/bee_project/images/test')
    if not test_dir.exists():
        print(f"❌ 테스트 폴더 없음: {test_dir}")
        return

    # 이미지 파일 수집
    test_images = list(test_dir.glob('*.jpg')) + list(test_dir.glob('*.png'))
    if len(test_images) == 0:
        print("❌ 테스트 이미지가 없습니다!")
        return

    # 랜덤 샘플링
    samples = random.sample(test_images, min(num_samples, len(test_images)))
    print(f"📁 전체 {len(test_images)}개 중 {len(samples)}개 샘플링\n")

    # 판별 시스템 초기화
    detector = BeeDetector()

    # 결과 저장
    all_results = []

    # 각 이미지 처리
    for i, img_path in enumerate(samples):
        print(f"\n🖼️ [{i+1}/{len(samples)}] {img_path.name}")

        # 처리
        image, results = detector.process_image(img_path)

        # 결과 출력
        if results:
            print(f"  ✅ 최종 {len(results)}마리 탐지 완료!")
        else:
            print(f"  ❌ 탐지된 벌 없음")

        all_results.append({
            'path': img_path,
            'image': image,
            'results': results
        })

    # 시각화
    visualize_results(all_results)

    return all_results

def visualize_results(all_results):
    """결과 시각화"""

    n = len(all_results)
    if n == 0:
        return

    # 그리드 크기 결정
    cols = min(3, n)
    rows = (n + cols - 1) // cols

    fig, axes = plt.subplots(rows, cols, figsize=(cols*6, rows*5))
    if n == 1:
        axes = [axes]
    elif rows == 1:
        axes = axes
    else:
        axes = axes.flatten()

    for idx, data in enumerate(all_results):
        ax = axes[idx] if n > 1 else axes[0]

        # 이미지 표시
        ax.imshow(data['image'])
        ax.set_title(f"{data['path'].name}\n탐지: {len(data['results'])}마리",
                    fontsize=10, fontweight='bold')
        ax.axis('off')

        # 바운딩 박스 그리기
        for result in data['results']:
            x1, y1, x2, y2 = result['bbox']

            # 색상 변환
            color = result['color']
            if color.startswith('#'):
                color = tuple(int(color[i:i+2], 16)/255 for i in (1, 3, 5))

            # 박스
            rect = patches.Rectangle((x1, y1), x2-x1, y2-y1,
                                    linewidth=2, edgecolor=color,
                                    facecolor='none')
            ax.add_patch(rect)

            # 라벨
            label = f"{result['species']}\n{result['species_conf']:.1%}"
            ax.text(x1, y1-5, label, color=color, fontsize=8,
                   fontweight='bold',
                   bbox=dict(boxstyle='round,pad=0.3',
                            facecolor='white', alpha=0.8))

    # 빈 subplot 숨기기
    for idx in range(n, len(axes)):
        axes[idx].axis('off')

    plt.suptitle('🐝 벌 종류 판별 결과 (군집화 개선)', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

def quick_test(num=3):
    """빠른 테스트 실행"""
    return test_and_visualize(num)

def analyze_test_results(num_samples=20):
    """테스트 결과 통계 분석"""

    print("📊 테스트 통계 분석")
    print("="*60)

    detector = BeeDetector()
    test_dir = Path('/content/bee_project/images/test')
    test_images = list(test_dir.glob('*.jpg'))

    if not test_images:
        print("❌ 테스트 이미지가 없습니다!")
        return

    samples = random.sample(test_images, min(num_samples, len(test_images)))

    # 통계 변수
    total_images = len(samples)
    total_detected = 0
    class_counts = {name: 0 for name in CLASS_NAMES}
    confidence_scores = []

    print(f"🔍 {total_images}개 이미지 분석 중...\n")

    for i, img_path in enumerate(samples):
        print(f"[{i+1}/{total_images}] {img_path.name}")
        _, results = detector.process_image(img_path)
        total_detected += len(results)

        for r in results:
            class_counts[r['species']] += 1
            confidence_scores.append(r['species_conf'])

    # 결과 출력
    print(f"\n📈 결과:")
    print(f"  테스트 이미지: {total_images}개")
    print(f"  탐지된 벌: {total_detected}마리")
    print(f"  평균 탐지: {total_detected/total_images:.1f}마리/이미지")

    if confidence_scores:
        print(f"  평균 신뢰도: {np.mean(confidence_scores):.2%}")
        print(f"  최소 신뢰도: {min(confidence_scores):.2%}")
        print(f"  최대 신뢰도: {max(confidence_scores):.2%}")

    print(f"\n📋 클래스별 탐지:")
    for name, count in class_counts.items():
        if count > 0:
            print(f"  {name}: {count}마리")

    return class_counts

# ============================================
# 메인
# ============================================

if __name__ == "__main__":
    print("🐝 벌 종류 판별 테스트 시스템 (군집화 개선)")
    print("="*60)
    print("\n사용법:")
    print("  quick_test(3)        # 3개 샘플 테스트")
    print("  quick_test(10)       # 10개 샘플 테스트")
    print("  analyze_test_results(20)  # 20개로 통계 분석")
    print("="*60)

In [None]:
quick_test(3)