# 1. 데이터 전처리 코드

데이터 전처리 코드입니다. (좌우 반전, 랜덤 회전 증강)
증강한 데이터는 코드와 같은 위치에 저장되도록 경로를 설정해놨습니다. 


In [None]:
# 데이터 전처리 코드입니다.

"""
- EDA 인사이트
1) EDA를 통해 클래스 별 불균형 확인 (가장 큰건 300장이 넘어가지만, 8장의 이미지를 가지는 클래스도 존재)
-> 불균형을 고려한 데이터 증강 필요성

추가하고 싶은 것) Train 데이터와 test 데이터의 분포 비교. (이거 해도 되는지 모르겠음..)
-> intensity, 주파수 영역 분석해서 분포가 비슷한걸 파악함 -> 이미지 분포를 크게 바꾸지 않는 증강만 고려


- 전처리 과정: 데이터 불균형을 고려한 데이터 증강
1) 클래스가 새, 자동차, 비행기이므로 좌우 반전 증강 가능.

2) 클래스가 새, 자동차, 비행기이므로 현실 세계를 고려하여 45도 이내의 시계/ 반시계 방향 랜덤 회전 
-> 이렇게 진행 시 모든 데이터 3배 증가함.

3) 이렇게 증강해도 300장이 되지 않는 데이터들에 대하여 45도 이내의 시계/ 반시계 방향 랜덤 회전 또 진행
-> 가장 작은 클래스도 300장이 넘어가도록 하여 데이터 불균형 해소

"""
import os
import random
from PIL import Image, ImageOps
from torchvision.transforms import functional as F

# 데이터 경로 설정
train_dir = "/test/final_exam/challenge/train"
augmented_dir = "augmented_train"
os.makedirs(augmented_dir, exist_ok=True)

# 증강 함수 정의
def augment_images(class_path, save_path, min_count=300):
    images = [f for f in os.listdir(class_path) if f.endswith(".jpg") or f.endswith(".png")]
    
    # 원본 이미지 불러오기
    image_paths = [os.path.join(class_path, img) for img in images]
    augmented_images = []

    # 좌우 반전
    for img_path in image_paths:
        with Image.open(img_path) as img:
            flipped = ImageOps.mirror(img)
            augmented_images.append(flipped)
            augmented_images.append(img.copy())

    # 랜덤 회전 (시계방향 & 반시계방향 45도 이내)
    final_images = []
    for img in augmented_images:
        for _ in range(2):
            angle = random.uniform(-45, 45)
            rotated = img.rotate(angle)
            final_images.append(rotated)

    # 이미지 저장 (최소 100장 확보)
    while len(final_images) < min_count:
        for img in augmented_images:
            angle = random.uniform(-45, 45)
            rotated = img.rotate(angle)
            final_images.append(rotated)
            if len(final_images) >= min_count:
                break

    # 저장
    os.makedirs(save_path, exist_ok=True)
    for idx, img in enumerate(final_images):
        img.save(os.path.join(save_path, f"aug_{idx}.jpg"))

# 클래스별 증강 실행
for class_name in os.listdir(train_dir):
    class_path = os.path.join(train_dir, class_name)
    if os.path.isdir(class_path):
        save_path = os.path.join(augmented_dir, class_name)
        augment_images(class_path, save_path)

print("Data augmentation complete.")

Data augmentation complete.


# 2. 훈련 + 평가 코드

여기서 부터 시드를 저장하고 훈련을 시작합니다.
사용한 모델은 EfficientNet_b5입니다. (AdamW + CosineAnnealingLR)

batch_size = 36,
epochs = 30,
num_classes = 300,
adamW_lr = 0.001,
weight_decay = 1e-4

평가는 가장 마지막 모델 훈련 상태 기준으로 진행합니다. (30 에폭 학습 후)


In [8]:
import os, random, torch
import numpy as np

def set_seed(seed):
    os.environ['PYTHONHASHSEED'] = str(seed) 
    random.seed(seed)  
    np.random.seed(seed)  
    torch.manual_seed(seed)  
    torch.cuda.manual_seed(seed)  
    torch.cuda.manual_seed_all(seed)  
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 예시
set_seed(123)

In [9]:
######################################################
# (1) 라이브러리 임포트 및 기본 설정
######################################################
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torch.utils.data import DataLoader, random_split, Subset
from torchvision import datasets, transforms, models

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report

# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [10]:
######################################################
# (2) 하이퍼파라미터 & 경로 설정
######################################################
batch_size = 36
epochs = 30
num_classes = 300
adamW_lr = 0.001
weight_decay = 1e-4

# 체크포인트 저장 폴더
checkpoint_dir = "./checkpoint_efficientnet_b5"
os.makedirs(checkpoint_dir, exist_ok=True)  # 없으면 생성

In [11]:
######################################################
# (3) 데이터 변환 & 데이터셋/로더 정의
######################################################

size = (456, 456)

transform = torchvision.transforms.Compose([
    torchvision.transforms.Resize(size),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

origin_train_dataset = torchvision.datasets.ImageFolder(root='./augmented_train', transform=transform)
test_dataset = torchvision.datasets.ImageFolder(root='/test/final_exam/challenge/test', transform=transform)

# train, valid 층화추출
# Dataset 전체의 클래스 레이블 가져오기
targets = [origin_train_dataset.targets[i] for i in range(len(origin_train_dataset))]

# train, valid 셋 클래스 비율을 맞추기 위한 층화추출
# 층화추출을 위한 StratifiedShuffleSplit 설정
from sklearn.model_selection import StratifiedShuffleSplit
split = StratifiedShuffleSplit(n_splits=1, test_size=0.05, random_state=42)  # train: 95%, valid: 5%

# train_index와 valid_index 생성
for train_index, valid_index in split.split(np.zeros(len(targets)), targets):
    train_indices = train_index
    valid_indices = valid_index

# Subset을 사용하여 Train과 Valid Dataset 생성
train_dataset = Subset(origin_train_dataset, train_indices)
validation_dataset = Subset(origin_train_dataset, valid_indices)

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Training samples:   {len(train_dataset)}")
print(f"Validation samples: {len(validation_dataset)}")
print(f"Test samples:       {len(test_dataset)}")

Training samples:   89178
Validation samples: 4694
Test samples:       4178


In [12]:
######################################################
# (4) 모델 정의 (EfficientNet-B5, scratch) & (선택)체크포인트 로드
######################################################
# 처음부터(scratch) → weights=None
effnet_b5 = models.efficientnet_b5(weights=None)

# 분류기 부분 수정 → 출력 차원 = 300개
in_features = effnet_b5.classifier[1].in_features  # 일반적으로 1536
effnet_b5.classifier[1] = nn.Linear(in_features, num_classes)
model = effnet_b5.to(device)

# [원하는 체크포인트 로드 시 주석 해제 예시]
# ckpt_path = os.path.join(checkpoint_dir, "checkpoint_epoch_37.pth")
# model.load_state_dict(torch.load(ckpt_path))
# print(f"Loaded checkpoint: {ckpt_path}")

In [13]:
######################################################
# (5) 옵티마이저, 스케줄러, 손실함수 정의
######################################################
criterion = nn.CrossEntropyLoss()
adamw_optimizer = torch.optim.AdamW(model.parameters(), lr=adamW_lr, weight_decay=weight_decay)
adamw_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(adamw_optimizer, T_max=epochs)

In [14]:
######################################################
# (6) 학습 함수, 검증 함수, 테스트 함수
######################################################
def train_model(num_epochs=epochs):
    for epoch in range(num_epochs):
        
        optimizer, scheduler = adamw_optimizer, adamw_scheduler
        
        model.train()
        running_loss  = 0.0
        total_batches = len(train_loader)

        print(f"Epoch {epoch+1}/{num_epochs}")
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if (batch_idx + 1) % 10 == 0 or (batch_idx + 1) == total_batches:
                print(f"  [Batch {batch_idx+1}/{total_batches}] Loss: {loss.item():.4f}")

        # Validation Loss
        val_loss = validate_model()

        # 매 Epoch마다 체크포인트 저장
        ckpt_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch+1}.pth")
        torch.save(model.state_dict(), ckpt_path)
        print(f"Checkpoint saved: {ckpt_path}")

        # 스케줄러 step
        scheduler.step()

        print(f"==> Epoch [{epoch+1}/{num_epochs}] "
              f"Train Loss: {running_loss/total_batches:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"LR: {scheduler.get_last_lr()[0]:.6f}\n")

def validate_model():
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in validation_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    return val_loss / len(validation_loader)

def test_model(model, data_loader):
    model.eval()
    correct    = 0
    total      = 0
    all_preds  = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            total   += labels.size(0)
            correct += (predicted == labels).sum().item()

            # F1, 혼동행렬 등을 위해 저장
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = 100.0 * correct / total
    print(f"Test Accuracy: {acc:.2f}%")
    return all_labels, all_preds

In [15]:
######################################################
# (7) 학습
######################################################
train_model(epochs)
print("----- Training Finished -----")

Epoch 1/30
  [Batch 10/2478] Loss: 5.7307
  [Batch 20/2478] Loss: 5.7816
  [Batch 30/2478] Loss: 5.8059


KeyboardInterrupt: 

In [17]:
######################################################
# (8) 평가(Eval) & CSV 제출
######################################################
# (선택) 원하는 체크포인트 로드 - 주석 해제 시 사용
# custom_ckpt = os.path.join(checkpoint_dir, "checkpoint_epoch_14.pth")
# model.load_state_dict(torch.load(custom_ckpt))
# print(f"Loaded checkpoint: {custom_ckpt}")

print("[Step 8] Evaluating current model & Saving CSV...")
all_labels_main, all_preds_main = test_model(model, test_loader)

submission_main = pd.read_csv('./sample_submission.csv')
submission_main['Label'] = all_preds_main
submission_main.to_csv('./Competition.csv', index=False)
print("Competition file saved as 'Competition.csv'.")

[Step 8] Evaluating current model & Saving CSV...


KeyboardInterrupt: 

# 3. 추가 학습 + 평가

위에서 30 에폭 학습 후, 아직 val loss가 더 아래로 수렴할 가능성이 있다고 판단하여 추가 학습을 진행했습니다.

damW_lr = 5e-5 (추가 학습에 사용할 LR),
weight_decay = 1e-4
추가 에폭 수 = 20 (전부 다 사용하지 않음)

시간 제약을 고려하여 val loss가 어느정도 수렴하는 지점인 37에폭 (기존 30 + 추가 7에폭)에서 학습을 종료 했습니다.

평가는 모든 가중치에 대하여 할 수 있도록 구현했습니다.


In [16]:
######################################################
# (8) 추가 학습: 원하는 체크포인트 로드 후 이어서 학습
######################################################

# 체크포인트 디렉토리 설정
checkpoint_dir = "checkpoint_efficientnet_b5"
new_checkpoint_dir = "checkpoint_efficientnet_b5_fine"
os.makedirs(new_checkpoint_dir, exist_ok=True)  # 새로운 디렉토리 생성

# 모델 및 옵티마이저 초기화
adamW_lr = 5e-5  # 추가 학습에 사용할 학습률
weight_decay = 1e-4

criterion = torch.nn.CrossEntropyLoss()
adamw_optimizer = torch.optim.AdamW(model.parameters(), lr=adamW_lr, weight_decay=weight_decay)
adamw_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(adamw_optimizer, T_max=7)  # 20 에폭 기준

# 원하는 체크포인트 로드
ckpt_path = os.path.join(checkpoint_dir, "checkpoint_epoch_30.pth")  # 체크포인트 경로
if os.path.exists(ckpt_path):
    model.load_state_dict(torch.load(ckpt_path))
    print(f"Loaded checkpoint from: {ckpt_path}")
else:
    print(f"Checkpoint not found: {ckpt_path}")

# 추가 학습 함수
def fine_tune_model(start_epoch=31, num_epochs=20):
    for epoch in range(start_epoch, start_epoch + num_epochs):
        model.train()
        running_loss = 0.0
        total_batches = len(train_loader)

        print(f"Epoch {epoch}/{start_epoch + num_epochs - 1}")
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            adamw_optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            adamw_optimizer.step()

            running_loss += loss.item()

            if (batch_idx + 1) % 10 == 0 or (batch_idx + 1) == total_batches:
                print(f"  [Batch {batch_idx+1}/{total_batches}] Loss: {loss.item():.4f}")

        # Validation Loss
        val_loss = validate_model()

        # 체크포인트 저장
        ckpt_path = os.path.join(new_checkpoint_dir, f"checkpoint_epoch_{epoch}.pth")
        torch.save(model.state_dict(), ckpt_path)
        print(f"Checkpoint saved: {ckpt_path}")

        # 스케줄러 step
        adamw_scheduler.step()

        print(f"==> Epoch [{epoch}/{start_epoch + num_epochs - 1}] "
              f"Train Loss: {running_loss/total_batches:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"LR: {adamw_scheduler.get_last_lr()[0]:.6f}\n")

# 추가 학습 실행
fine_tune_model(start_epoch=31, num_epochs=20)
print("----- Fine-tuning Finished -----")

  model.load_state_dict(torch.load(ckpt_path))


Loaded checkpoint from: checkpoint_efficientnet_b5/checkpoint_epoch_30.pth
Epoch 31/37
  [Batch 10/2478] Loss: 0.0001


KeyboardInterrupt: 

In [18]:
############## 원하는 가중치로 eval 코드 : 37 에폭 (이건 결과 보고) ###############
# 저장된 가중치 불러오기
ckpt_path = "./checkpoint_efficientnet_b5_fine/checkpoint_epoch_37.pth"  # 학습된 가중치 경로
model.load_state_dict(torch.load(ckpt_path))
print(f"Loaded checkpoint: {ckpt_path}")

# 테스트 데이터에 대한 예측 수행
print("[Step 8] Evaluating current model & Saving CSV...")
all_labels_main, all_preds_main = test_model(model, test_loader)

submission_main = pd.read_csv('./sample_submission.csv')  # 기존 제공된 샘플 파일 로드
submission_main['Label'] = all_preds_main  # 예측 결과 저장
submission_main.to_csv('./Competition_epoch37_b5.csv', index=False)  # CSV 파일로 저장
print("Competition file saved as 'Competition_epochMain37_b5.csv'.")

  model.load_state_dict(torch.load(ckpt_path))


Loaded checkpoint: ./checkpoint_efficientnet_b5_fine/checkpoint_epoch_37.pth
[Step 8] Evaluating current model & Saving CSV...


KeyboardInterrupt: 