## 00 Install

In [None]:
# !pip install torchinfo
# !pip install modules
# !pip install pymysql sqlalchemy

## 01 Import

In [None]:
# import libraries
import os
import numpy as np
import pandas as pd
from pathlib import Path
import os.path
import warnings
warnings.filterwarnings("ignore")
import time

import torch
import torchvision
from torchvision import models
from torch import nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchinfo import summary
import torchvision.transforms as transforms

from pathlib import Path
from typing import Tuple, List, Dict, Optional
import random
from PIL import Image, ImageDraw
from torchvision.utils import make_grid

# 디바이스 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'

## 02 Load Dataset

In [None]:
# 데이터 경로 설정
data_dir = 'your_path/fruit-and-vegetable-image'

# 데이터 불러오기
train_dir = os.path.join(data_dir, 'train')
train_filepaths = list(Path(train_dir).rglob('*.jpg')) + \
                  list(Path(train_dir).rglob('*.jpeg')) + \
                  list(Path(train_dir).rglob('*.png')) + \
                  list(Path(train_dir).rglob('*.JPG'))

test_dir = os.path.join(data_dir, 'test')
test_filepaths = list(Path(test_dir).rglob('*.jpg')) + \
                 list(Path(test_dir).rglob('*.jpeg')) + \
                 list(Path(test_dir).rglob('*.png')) + \
                 list(Path(test_dir).rglob('*.JPG'))

val_dir = os.path.join(data_dir, 'validation')
val_filepaths = list(Path(val_dir).rglob('*.jpg')) + \
                 list(Path(val_dir).rglob('*.jpeg')) + \
                 list(Path(val_dir).rglob('*.png')) + \
                 list(Path(val_dir).rglob('*.JPG'))


def proc_img(filepath):
    """
    이미지 파일의 경로와 라벨을 포함하는 DataFrame을 생성하는 함수
    """

    # 파일 경로에서 라벨(폴더명) 추출
    labels = [str(filepath[i]).split("/")[-2] for i in range(len(filepath))]

    # 파일 경로를 pandas Series로 변환하고 문자열로 저장
    filepath = pd.Series(filepath, name='Filepath').astype(str)

    # 라벨을 pandas Series로 변환
    labels = pd.Series(labels, name='Label')

    # 파일 경로와 라벨을 하나의 DataFrame으로 합치기
    df = pd.concat([filepath, labels], axis=1)

    # 데이터 프레임을 랜덤하게 섞고 인덱스 초기화
    df = df.sample(frac=1).reset_index(drop=True)

    return df

# 학습(train), 테스트(test), 검증(validation) 데이터 프레임 생성
train_df = proc_img(train_filepaths)
test_df = proc_img(test_filepaths)
val_df = proc_img(val_filepaths)

# 클래스 목록 가져오기
class_labels = train_df.Label.unique()

## 03 Create Transformer

In [None]:
# ResNet-18 모델의 기본 사전 학습된 가중치(Weights) 불러오기
ResNet_weights = models.ResNet50_Weights.DEFAULT

# 해당 가중치에 맞는 변환(transform) 객체 가져오기
ResNet_transformer = ResNet_weights.transforms()  # 모델이 요구하는 데이터 전처리 파이프라인이 반환

# 변환 객체 출력 (어떤 변환이 적용되는지 확인)
ResNet_transformer

ImageClassification(
    crop_size=[224]
    resize_size=[232]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BILINEAR
)

In [None]:
# 랜덤한 사각형 노이즈 추가 함수
def add_rectangle_noise(image):
    """이미지에 랜덤한 크기와 위치의 사각형 노이즈 추가"""
    draw = ImageDraw.Draw(image)
    width, height = image.size

    # 노이즈 크기 (전체 이미지 크기의 30~50%)
    box_width = random.randint(int(0.3 * width), int(0.5 * width))
    box_height = random.randint(int(0.3 * height), int(0.5 * height))

    # 랜덤 위치
    x1 = random.randint(0, width - box_width)
    y1 = random.randint(0, height - box_height)
    x2 = x1 + box_width
    y2 = y1 + box_height

    # 랜덤 색상 (0~255 범위)
    noise_color = tuple(np.random.randint(0, 256, size=3).tolist())

    # 사각형 그리기
    draw.rectangle([x1, y1, x2, y2], fill=noise_color)
    return image

# 데이터셋 증강 및 변환 적용
augmented_transform = transforms.Compose([
    transforms.RandomRotation(20),  # -20도 ~ 20도 회전
    transforms.RandomHorizontalFlip(),  # 50% 확률로 좌우 반전
    transforms.Lambda(lambda img: add_rectangle_noise(img)),  # 랜덤한 사각형 노이즈 추가
    ResNet_transformer  # MobileNetV3 기본 변환 적용
])

# 원본과 증강 데이터셋을 각각 생성
train_data_original = ImageFolder(root=train_dir, transform=ResNet_transformer)  # 원본 데이터
train_data_augmented = ImageFolder(root=train_dir, transform=augmented_transform)  # 증강 데이터

validation_data_original = ImageFolder(root=val_dir, transform=ResNet_transformer)  # 검증 원본 데이터
validation_data_augmented = ImageFolder(root=val_dir, transform=augmented_transform)  # 검증 증강 데이터

# 결합
train_data_transformed = torch.utils.data.ConcatDataset([train_data_original, train_data_augmented])
validation_data_transformed = torch.utils.data.ConcatDataset([validation_data_original, validation_data_augmented])

print(f"Original dataset size: {len(train_data_original)}")
print(f"Augmented dataset size: {len(train_data_augmented)}")
print(f"Combined dataset size: {len(train_data_transformed)}")
print(f"Validation dataset szie: {len(validation_data_transformed)}")

# 테스트 데이터는 원본 그대로 유지
test_data_transformed = ImageFolder(root=test_dir, transform=ResNet_transformer)

# 변환된 데이터셋 확인
print(train_data_original, '\n\n', train_data_augmented)

Original dataset size: 3115
Augmented dataset size: 3115
Combined dataset size: 6230
Validation dataset szie: 702
Dataset ImageFolder
    Number of datapoints: 3115
    Root location: /content/drive/MyDrive/Colab_Notebooks/fruit-and-vegetable-image/train
    StandardTransform
Transform: ImageClassification(
               crop_size=[224]
               resize_size=[232]
               mean=[0.485, 0.456, 0.406]
               std=[0.229, 0.224, 0.225]
               interpolation=InterpolationMode.BILINEAR
           ) 

 Dataset ImageFolder
    Number of datapoints: 3115
    Root location: /content/drive/MyDrive/Colab_Notebooks/fruit-and-vegetable-image/train
    StandardTransform
Transform: Compose(
               RandomRotation(degrees=[-20.0, 20.0], interpolation=nearest, expand=False, fill=0)
               RandomHorizontalFlip(p=0.5)
               Lambda()
               ImageClassification(
               crop_size=[224]
               resize_size=[232]
               mean=[0.4

## 04 Dataloader

In [None]:
# 배치 크기 및 워커(worker) 수 설정
BATCH_SIZE = 32  # 한 번에 학습할 데이터 샘플 수
NUM_WORKERS = os.cpu_count()  # 사용할 CPU 코어 개수 (최대 성능 활용)

# 학습 데이터 로더 생성
train_dataloader = DataLoader(dataset=train_data_transformed,  # 학습 데이터셋
                              batch_size=BATCH_SIZE,  # 배치 크기 설정
                              num_workers=NUM_WORKERS,  # CPU 워커 개수 설정
                              shuffle=True,  # 학습 데이터는 랜덤으로 섞어서 로드
                              pin_memory=True)  # CUDA 사용 시 메모리 핀 설정 (속도 향상)

# 검증 데이터 로더 생성
validation_dataloader = DataLoader(dataset=validation_data_transformed,
                                   batch_size=BATCH_SIZE,
                                   num_workers=NUM_WORKERS,
                                   shuffle=False,
                                   pin_memory=True)

# 테스트 데이터 로더 생성
test_dataloader = DataLoader(dataset=test_data_transformed,
                             batch_size=BATCH_SIZE,
                             num_workers=NUM_WORKERS,
                             shuffle=False,
                             pin_memory=True)

## 05 Modeling

In [None]:
# ResNet-50 모델 생성 및 사전 학습된 가중치 적용
ResNet_model = models.resnet50(weights=ResNet_weights).to(device)

# 특징 추출기(Feature Extractor) 부분 동결
for param in ResNet_model.parameters():
    param.requires_grad = False  # 기존 가중치를 고정하여 학습되지 않도록 설정

# 랜덤 시드 고정
torch.manual_seed(42) # 동일한 결과를 얻기 위해 랜덤 시드 설정 (CPU)
torch.cuda.manual_seed(42) # 동일한 결과를 얻기 위해 랜덤 시드 설정 (GPU)

# 클래스 수 정의 (이미 정의된 class_labels 사용)
num_classes = len(class_labels)

num_ftrs = ResNet_model.fc.in_features  # 기본적으로 ResNet-50의 fc는 2048 입력

# 분류기(Classifier) 레이어 재구성
ResNet_model.fc = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True), # 과적합 방지를 위한 드롭아웃 추가 (20% 확률로 뉴런 비활성화))
    nn.Linear(in_features=num_ftrs, out_features=len(class_labels)) # 출력 뉴런 수를 클래스 개수(len(class_labels))로 설정
)

# 모델 디바이스 이동
ResNet_model = ResNet_model.to(device)

# 모델의 구조를 다시 확인하여 변경된 부분을 확인
summary(model=ResNet_model,  # 모델 지정
        input_size=[32, 3, 224, 224],  # 입력 데이터 크기 (batch_size=32, 채널=3, 높이=224, 너비=224)
        col_names=['input_size', 'output_size', 'num_params', 'trainable'],  # 출력할 정보 설정
        row_settings=['var_names'])  # 변수명 출력

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 174MB/s]


Layer (type (var_name))                  Input Shape               Output Shape              Param #                   Trainable
ResNet (ResNet)                          [32, 3, 224, 224]         [32, 36]                  --                        Partial
├─Conv2d (conv1)                         [32, 3, 224, 224]         [32, 64, 112, 112]        (9,408)                   False
├─BatchNorm2d (bn1)                      [32, 64, 112, 112]        [32, 64, 112, 112]        (128)                     False
├─ReLU (relu)                            [32, 64, 112, 112]        [32, 64, 112, 112]        --                        --
├─MaxPool2d (maxpool)                    [32, 64, 112, 112]        [32, 64, 56, 56]          --                        --
├─Sequential (layer1)                    [32, 64, 56, 56]          [32, 256, 56, 56]         --                        False
│    └─Bottleneck (0)                    [32, 64, 56, 56]          [32, 256, 56, 56]         --                        False


## 06 Training

In [None]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
    """
    단일 학습 스텝을 수행하는 함수
    """
    model.train()  # 학습 모드 설정
    train_loss, train_acc = 0, 0  # 학습 손실 및 정확도 초기화

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)  # 입력 데이터와 라벨을 지정된 장치로 이동
        optimizer.zero_grad()  # 이전 단계의 그래디언트 초기화
        y_pred = model(X)  # 모델 예측 수행
        loss = loss_fn(y_pred, y)  # 손실 계산
        loss.backward()  # 역전파(Backpropagation)
        optimizer.step()  # 가중치 업데이트

        train_loss += loss.item()  # 손실 합산
        acc = (y_pred.argmax(dim=1) == y).sum().item() / len(y)  # 정확도 계산
        train_acc += acc  # 정확도 합산

    return train_loss / len(dataloader), train_acc / len(dataloader)  # 평균 손실 및 정확도 반환


def eval_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
    """
    단일 평가 스텝을 수행하는 함수
    """
    model.eval()  # 평가 모드 설정
    eval_loss, eval_acc = 0, 0  # 평가 손실 및 정확도 초기화

    with torch.inference_mode():  # 평가 시 그래디언트 계산 비활성화 (속도 향상)
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)  # 입력 데이터와 라벨을 지정된 장치로 이동
            y_pred = model(X)  # 모델 예측 수행
            loss = loss_fn(y_pred, y)  # 손실 계산
            eval_loss += loss.item()  # 손실 합산
            acc = (y_pred.argmax(dim=1) == y).sum().item() / len(y)  # 정확도 계산
            eval_acc += acc  # 정확도 합산

    return eval_loss / len(dataloader), eval_acc / len(dataloader)  # 평균 손실 및 정확도 반환


def train_eval(model: torch.nn.Module,
               train_dataloader: torch.utils.data.DataLoader,
               val_dataloader: torch.utils.data.DataLoader,
               epochs: int,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> List[Dict[str, float]]:
    """
    지정된 에포크(epochs) 동안 모델을 학습하고 평가하는 함수
    """
    results = []  # 학습 및 평가 결과를 저장할 리스트

    # 전체 학습 시작 시간 기록
    total_start_time = time.time()

    for epoch in range(epochs):
        # 학습 단계 수행
        train_loss, train_acc = train_step(model, train_dataloader, loss_fn, optimizer, device)
        # 평가 단계 수행
        val_loss, val_acc = eval_step(model, val_dataloader, loss_fn, device)

        # 현재 학습률 확인
        current_lr = optimizer.param_groups[0]['lr']

        # 학습 및 평가 결과 출력
        print(f"Epoch {epoch+1}/{epochs}: \n"
              f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} \n"
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | LR: {current_lr:.6f}")

        # 결과 저장
        results.append({
            "epoch": epoch+1,
            "Train_loss": train_loss,
            "Train_acc": train_acc,
            "Val_loss": val_loss,
            "Val_acc": val_acc,
            "lr": current_lr
        })

    # 전체 학습 끝 시간 기록
    total_end_time = time.time()
    total_minutes = (total_end_time - total_start_time) / 60

    print(f"\n🕒 전체 학습 시간: {total_minutes:.2f}분")

    return results

### train()

In [None]:
# 손실 함수 및 옵티마이저 초기화
loss_fn = nn.CrossEntropyLoss()  # 다중 분류 문제에 적합한 크로스 엔트로피 손실 함수 사용
optimizer = optim.Adam(params=ResNet_model.parameters(), lr=0.001, weight_decay=0.0001)  # Adam 옵티마이저 사용, 학습률(lr) 설정

# 학습할 총 에포크 수 설정
NUM_EPOCHS = 50

# 학습 실행
results = train_eval(model=ResNet_model,
                     train_dataloader=train_dataloader,
                     val_dataloader=validation_dataloader,
                     epochs=NUM_EPOCHS,
                     loss_fn=loss_fn,
                     optimizer=optimizer,
                     device=device)

Epoch 1/50: 
Train Loss: 1.8686 | Train Acc: 0.6470 
Val Loss: 0.7285 | Val Acc: 0.8889 | LR: 0.001000
Epoch 2/50: 
Train Loss: 0.7845 | Train Acc: 0.8430 
Val Loss: 0.4207 | Val Acc: 0.9289 | LR: 0.001000
Epoch 3/50: 
Train Loss: 0.5523 | Train Acc: 0.8772 
Val Loss: 0.3141 | Val Acc: 0.9332 | LR: 0.001000
Epoch 4/50: 
Train Loss: 0.4446 | Train Acc: 0.9015 
Val Loss: 0.2666 | Val Acc: 0.9304 | LR: 0.001000
Epoch 5/50: 
Train Loss: 0.3670 | Train Acc: 0.9174 
Val Loss: 0.2277 | Val Acc: 0.9529 | LR: 0.001000
Epoch 6/50: 
Train Loss: 0.3196 | Train Acc: 0.9269 
Val Loss: 0.2110 | Val Acc: 0.9444 | LR: 0.001000
Epoch 7/50: 
Train Loss: 0.2955 | Train Acc: 0.9277 
Val Loss: 0.1860 | Val Acc: 0.9616 | LR: 0.001000
Epoch 8/50: 
Train Loss: 0.2641 | Train Acc: 0.9377 
Val Loss: 0.1784 | Val Acc: 0.9574 | LR: 0.001000
Epoch 9/50: 
Train Loss: 0.2390 | Train Acc: 0.9481 
Val Loss: 0.1850 | Val Acc: 0.9445 | LR: 0.001000
Epoch 10/50: 
Train Loss: 0.2231 | Train Acc: 0.9474 
Val Loss: 0.1629 | 

## 07 Result

In [None]:
# 결과를 데이터프레임으로 변환
results_df = pd.DataFrame(results)

In [None]:
# 테스트 셋 평가
test_loss, test_acc = eval_step(model=ResNet_model,
                                dataloader=test_dataloader,
                                loss_fn=loss_fn,
                                device=device)

In [None]:
# 결과
print(f"Test accuracy: {test_acc:.6f}")
print(f"Test Loss: {test_loss:.6f}")

Test accuracy: 0.968750
Test Loss: 0.104553


In [None]:
# Test accuracy: 0.971354
# Test Loss: 0.098235

## 08 DB 저장

In [None]:
from sqlalchemy import create_engine

# DB 연결 정보 입력 (Aiven MySQL 기준)
host = 'YOUR_AIVEN_HOST'
port = 'YOUR_AIVEN_PORT'
username = 'YOUR_USERNAME'
password = 'YOUR_PASSWORD'
database = 'YOUR_DATABASE'

# SQLAlchemy 엔진 생성
engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}")
# DataFrame을 MySQL에 저장
results_df.to_sql(name='train_logs_resnet', con=engine, if_exists='append', index=False)

# 테스트 결과를 DataFrame으로 변환
test_result_df = pd.DataFrame([{
    "model_name": "ResNet50",
    "test_loss": test_loss,
    "test_acc": test_acc
}])

test_result_df.to_sql(name='test_results', con=engine, if_exists='append', index=False)

1