In [None]:
import os, math, time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from tqdm.auto import tqdm

from common.config import (
    BASE_DIR,
    NUM_CLASSES,
    BATCH_SIZE,
    EPOCHS,
    WARMUP_EPOCHS,
    SEED,
    BASE_LEARNING_RATE,
    WD,
    SEED,
    DEVICE,
    WORKER_NUM,
    CLASS_NAMES,
    TRANSFORM,
    TRAIN_TRANSFORM,
    VAL_TRANSFORM,
    TRAIN_IMAGE_DIR,
    TRAIN_LABEL_DIR,
    VAL_IMAGE_DIR,
    VAL_LABEL_DIR,
    TEST_IMAGE_DIR,
    TEST_LABEL_DIR,
    LOG_DIR,
    TRAIN_LOG_DIR,
    TEST_LOG_DIR,
    MODEL_DIR,
    DROPOUT_RATE,
)
from common.utils import (
    set_seed,
    worker_init_fn,
    get_mean_std_from_weights,
    get_device,
    device_pretty,
    human_time,
    count_parameters,
    calculate_top_k_accuracy,
    calculate_f1_score,
    calculate_auroc,
    save_auroc_data,
    save_checkpoint,
)
from common.dataset import CustomDataset
from common.logger import Logger
from common.evaluate import evaluate_test_set, print_test_results

# initail seed 설정
set_seed(SEED)

# Check PyTorch version and CUDA availability
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA device count: {torch.cuda.device_count()}")

if torch.cuda.is_available():
    print(f"Current CUDA device: {torch.cuda.current_device()}")
    print(f"CUDA device name: {torch.cuda.get_device_name()}")

# Set device for computations
device = torch.device(DEVICE)
print(f"Using device: {device}")

In [None]:
import os, glob

print("CWD:", os.getcwd())
print("LABEL_DIR:", TRAIN_LABEL_DIR)
print("IMAGE_DIR:", TRAIN_IMAGE_DIR)
print("LABEL_DIR exists:", os.path.isdir(TRAIN_LABEL_DIR))
print("IMAGE_DIR exists:", os.path.isdir(TRAIN_IMAGE_DIR))

json_paths = glob.glob(os.path.join(TRAIN_LABEL_DIR, "**", "*.json"), recursive=True)
print("Found json files:", len(json_paths))

# 1. 데이터 셋 로드


In [None]:
MODEL_FILE_NAME = "best_convnet_model.pth"

# 1) 데이터 증강,변환(색 왜곡은 과하지 않게)
train_dataset = CustomDataset(
    label_folder=TRAIN_LABEL_DIR, image_folder=TRAIN_IMAGE_DIR, transform=TRAIN_TRANSFORM
)
val_datatset = CustomDataset(
    label_folder=VAL_LABEL_DIR, image_folder=VAL_IMAGE_DIR, transform=VAL_TRANSFORM
)
test_datatset = CustomDataset(
    label_folder=TEST_LABEL_DIR, image_folder=TEST_IMAGE_DIR, transform=VAL_TRANSFORM
)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=WORKER_NUM,
    generator=torch.Generator(device=DEVICE),
    worker_init_fn=worker_init_fn,
)
val_loader = DataLoader(
    val_datatset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=WORKER_NUM,
    generator=torch.Generator(device=DEVICE),
)
test_loader = DataLoader(
    val_datatset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=WORKER_NUM,
    generator=torch.Generator(device=DEVICE),
)

# 2. 모델 설정 - ConvNet

## 각 모델별로 다르게 설정!!

In [None]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=15, dropout_rate=0.5):
        super(ConvNet, self).__init__()

        self.features = nn.Sequential(
            # First block: Conv(3→64)→BN→ReLU→MaxPool
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Second block: [Conv(64→128)→BN→ReLU]×2→MaxPool
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Third block: [Conv(128→256)→BN→ReLU]×2→MaxPool
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),  # Global Average Pooling
            nn.Flatten(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, num_classes),
        )

        # He initialization
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
# 2) 모델: ConvNet - CNN 기반 모델
def create_model():
    model = ConvNet(num_classes=NUM_CLASSES, dropout_rate=DROPOUT_RATE)  # 15, 0.5
    return model.to(device)

model = create_model()
model

# 3. Optima, Scheduler, Scaler 설정

In [None]:
# 3) 손실/옵티마/스케줄러
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()), lr=BASE_LEARNING_RATE, weight_decay=WD
)

def lr_lambda(current_epoch):
    if current_epoch < WARMUP_EPOCHS:
        return float(current_epoch + 1) / WARMUP_EPOCHS
    # cosine
    t = (current_epoch - WARMUP_EPOCHS) / max(1, (EPOCHS - WARMUP_EPOCHS))
    return 0.5 * (1 + math.cos(math.pi * t))


# 스케쥴러
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

# 스케일러
# Mixed Precision Training 설정 (GPU에서만 사용)
if torch.cuda.is_available():
    scaler = torch.amp.GradScaler("cuda")
    print("✅ Mixed Precision Training enabled (GPU)")
else:
    scaler = None
    print("⚠️  Mixed Precision Training disabled (CPU mode)")

# Logger 설정
logger = Logger(LOG_DIR, "train_log.json")

# 4. 평가지표 변수 설정

In [None]:
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []
train_top3_accuracies, val_top3_accuracies = [], []
train_aurocs, val_aurocs = [], []
train_f1_scores, val_f1_scores = [], []

best_val_loss = float("inf")
best_composite_score = 0.0  # 복합 점수 초기화
best_epoch = 0
early_stopping_counter = 0
patience = 20

# 학습 시작 시간
total_start_time = time.time()
print(
    f"Training started at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(total_start_time))}"
)

# 5. 모델 학습 함수 정의

In [None]:
def train_one_epoch(
    model, train_loader, criterion, optimizer, device, scaler=None, desc=None
):
    """한 에포크 학습"""
    model.train()
    epoch_loss = 0.0
    epoch_correct = 0
    epoch_top3_correct = 0
    epoch_total = 0

    train_outputs = []
    train_labels = []

    iterator = train_loader
    if desc is not None:
        try:
            from tqdm.auto import tqdm

            iterator = tqdm(train_loader, desc=desc)
        except Exception:
            iterator = train_loader

    for images, labels in iterator:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        if scaler is not None:
            with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                outputs = model(images)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            outputs = model(images)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

        epoch_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        batch_size = labels.size(0)
        epoch_total += batch_size
        epoch_correct += (preds == labels).sum().item()
        epoch_top3_correct += calculate_top_k_accuracy(outputs, labels, k=3)

        train_outputs.extend(outputs.detach().cpu().tolist())
        train_labels.extend(labels.detach().cpu().tolist())

    epoch_loss /= max(1, len(train_loader))
    epoch_acc = epoch_correct / max(1, epoch_total)
    epoch_top3_acc = epoch_top3_correct / max(1, epoch_total)

    return {
        "loss": epoch_loss,
        "accuracy": epoch_acc,
        "top3_accuracy": epoch_top3_acc,
        "outputs": train_outputs,
        "labels": train_labels,
    }


def validation_one_epoch(model, val_loader, criterion, device, desc=None):
    """한 에포크 검증"""
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_top3_correct = 0
    val_total = 0

    val_outputs = []
    val_labels = []

    iterator = val_loader
    if desc is not None:
        try:
            from tqdm.auto import tqdm

            iterator = tqdm(val_loader, desc=desc)
        except Exception:
            iterator = val_loader

    with torch.no_grad():
        for images, labels in iterator:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            batch_size = labels.size(0)
            val_total += batch_size
            val_correct += (preds == labels).sum().item()
            val_top3_correct += calculate_top_k_accuracy(outputs, labels, k=3)

            val_outputs.extend(outputs.detach().cpu().tolist())
            val_labels.extend(labels.detach().cpu().tolist())

    val_loss /= max(1, len(val_loader))
    val_acc = val_correct / max(1, val_total)
    val_top3_acc = val_top3_correct / max(1, val_total)

    return {
        "loss": val_loss,
        "accuracy": val_acc,
        "top3_accuracy": val_top3_acc,
        "outputs": val_outputs,
        "labels": val_labels,
    }

# 6. 모델 훈련

In [None]:
for epoch in range(EPOCHS):
    start_time = time.time()  # 에포크 시작 시간 기록

    # ===============================================
    # Training
    # ===============================================
    train_metrics = train_one_epoch(
        model=model,
        train_loader=train_loader,
        criterion=criterion,
        optimizer=optimizer,
        device=device,
        scaler=scaler,
        desc=f"Training Epoch {epoch+1}/{EPOCHS}",
    )

    train_losses.append(train_metrics["loss"])
    train_accuracies.append(train_metrics["accuracy"])
    train_top3_accuracies.append(train_metrics["top3_accuracy"])
    train_f1_scores.append(
        calculate_f1_score(
            torch.tensor(train_metrics["outputs"]),
            torch.tensor(train_metrics["labels"]),
        )
    )
    train_aurocs.append(
        calculate_auroc(
            torch.tensor(train_metrics["outputs"]),
            torch.tensor(train_metrics["labels"]),
        )
    )
    save_auroc_data(
        train_metrics["outputs"],
        train_metrics["labels"],
        epoch + 1,
        "train",
        TRAIN_LOG_DIR,
    )

    # ===============================================
    # Validation
    # ===============================================
    val_metrics = validation_one_epoch(
        model=model,
        val_loader=val_loader,
        criterion=criterion,
        device=device,
        desc="Validating",
    )

    val_losses.append(val_metrics["loss"])
    val_accuracies.append(val_metrics["accuracy"])
    val_top3_accuracies.append(val_metrics["top3_accuracy"])
    val_f1_scores.append(
        calculate_f1_score(
            torch.tensor(val_metrics["outputs"]), torch.tensor(val_metrics["labels"])
        )
    )
    val_aurocs.append(
        calculate_auroc(
            torch.tensor(val_metrics["outputs"]), torch.tensor(val_metrics["labels"])
        )
    )
    save_auroc_data(
        val_metrics["outputs"], val_metrics["labels"], epoch + 1, "val", TRAIN_LOG_DIR
    )

    # Scheduler
    scheduler.step()  # ReduceLROnPlateau 사용 시: scheduler.step(val_metrics["loss"])

    # 에포크 소요 시간 계산
    end_time = time.time()
    epoch_duration = end_time - start_time

    # 에포크 결과 출력
    print(
        f"[Epoch {epoch+1}/{EPOCHS}] "
        f"Time: {epoch_duration:.2f}s, "
        f"Train Loss: {train_losses[-1]:.4f}, Train Acc: {train_accuracies[-1]:.4f}, Train Top-3: {train_top3_accuracies[-1]:.4f}, Train F1: {train_f1_scores[-1]:.4f}, "
        f"Val Loss: {val_losses[-1]:.4f}, Val Acc: {val_accuracies[-1]:.4f}, Val Top-3: {val_top3_accuracies[-1]:.4f}, Val F1: {val_f1_scores[-1]:.4f}"
    )

    # 의료 이미지 분류에 최적화된 복합 점수 계산
    composite_score = (
        0.4 * val_accuracies[-1]  # 40% - 정확도 (가장 중요)
        + 0.3 * val_aurocs[-1]  # 30% - AUROC (의료 분류에서 중요)
        + 0.2 * val_f1_scores[-1]  # 20% - F1 Score (클래스 불균형 고려)
        + 0.1 * (1 - val_losses[-1])  # 10% - Loss (낮을수록 좋음)
    )

    # 최고 성능 모델 저장
    if composite_score > best_composite_score:
        best_composite_score = composite_score
        best_epoch = epoch + 1
        early_stopping_counter = 0
        save_checkpoint(
            {
                "epoch": epoch + 1,  # 현재 에포크
                "state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "train_loss": train_losses[-1],
                "train_accuracy": train_accuracies[-1],
                "train_top3_accuracy": train_top3_accuracies[-1],
                "train_f1_score": train_f1_scores[-1],
                "train_auroc": train_aurocs[-1],
                "val_loss": val_losses[-1],
                "val_accuracy": val_accuracies[-1],
                "val_top3_accuracy": val_top3_accuracies[-1],
                "val_f1_score": val_f1_scores[-1],
                "val_auroc": val_aurocs[-1],
                "composite_score": composite_score,
                "epoch_duration": epoch_duration,
            },
            MODEL_DIR,
            MODEL_FILE_NAME,
        )
        print(f"🎯 New best model saved! Composite Score: {composite_score:.4f}")
    else:
        early_stopping_counter += 1

    if early_stopping_counter >= patience:
        print(f"Early stopping at epoch {epoch+1}. Best epoch was {best_epoch}.")
        break

    # 로그 기록
    logger.log(
        {
            "epoch": epoch + 1,
            "train_loss": train_losses[-1],
            "train_accuracy": train_accuracies[-1],
            "train_top3_accuracy": train_top3_accuracies[-1],
            "train_f1_score": train_f1_scores[-1],
            "train_auroc": train_aurocs[-1],
            "val_loss": val_losses[-1],
            "val_accuracy": val_accuracies[-1],
            "val_top3_accuracy": val_top3_accuracies[-1],
            "val_f1_score": val_f1_scores[-1],
            "val_auroc": val_aurocs[-1],
            "composite_score": composite_score,
            "epoch_duration": epoch_duration,
        }
    )

In [None]:
# 로그 저장
logger.save()

# 학습 종료 시간
total_end_time = time.time()
total_duration = total_end_time - total_start_time
print(
    f"Training completed at {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(total_end_time))}"
)
print(f"Total training time: {total_duration:.2f}s")

# 7. Test 검증

In [None]:
# 체크포인트 로드
checkpoint = torch.load(os.path.join(MODEL_DIR, MODEL_FILE_NAME), weights_only=False)

# 체크포인트의 키들 확인
print("Checkpoint keys:", list(checkpoint.keys()))
print("State dict keys (first 10):", list(checkpoint["state_dict"].keys())[:10])

In [None]:
# 모델 생성 (기존 모델 정의 코드 필요)
model = create_model()

# 체크포인트에서 가중치 로드
model.load_state_dict(checkpoint["state_dict"])

# 평가 실행
results = evaluate_test_set(model, test_loader, criterion, device)
print_test_results(results, "ResNet-101")