In [None]:
# 라이브러리 불러오기
import matplotlib.pyplot as plt
import os
import time
import csv

from tqdm import tqdm

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models

from torchvision import datasets
from torchvision.transforms import Compose, Normalize
from torch.utils.data import DataLoader
from torch.optim.adam import Adam

In [None]:
# 데이터셋에 적용할 transform
transform = Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))
])

# 데이터 경로
data_path = 'data'
# split_data(data_path)

# 주요 하이퍼 파라미터
learning_rate = 0.0001
batch_size = 32

device = 'cuda' if torch.cuda.is_available() else 'cpu'
# device = 'cpu'
epoch_num = 100

# Train 데이터셋 및 DataLoader 생성
train_dataset = datasets.ImageFolder(f'{data_path}/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Validation 데이터셋 및 DataLoader 생성
val_dataset = datasets.ImageFolder(f'{data_path}/val', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

# Test 데이터셋 및 DataLoader 생성
test_dataset = datasets.ImageFolder(f'{data_path}/test', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_val_loss = float('inf')
        self.early_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_val_loss - self.min_delta:
            self.best_val_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# 모델 학습 결과 csv 파일로 저장         
def save_metrics(mode, output_dir, epoch, loss, accuracy):
    csv_file = f'{output_dir}/{mode}_metrics.csv'
    if not os.path.exists(csv_file):
        with open(csv_file, 'w') as csvfile:
            fieldnames = ['epoch', 'loss', 'accuracy']
            writer = csv.writer(csvfile)
            writer.writerow(fieldnames)
            writer.writerow([epoch, loss, accuracy])
    else:
        with open(csv_file, 'a') as csvfile:
            fieldnames = ['epoch', 'loss', 'accuracy']
            writer = csv.writer(csvfile)
            writer.writerow([epoch, loss, accuracy])

In [None]:
# from torchsummary import summary
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
# model = models.vgg11(pretrained=True)
# model.classifier[6] = nn.Sequential(
#     nn.Linear(4096, 1),
#     nn.Sigmoid()  
# )
# model.to(device)
# # 모델 요약
# summary(model, (3, 224, 224), batch_size=batch_size)

In [None]:

# 시작 시간 설정
train_start_time = time.time()

# 모델 선언
model = models.vgg11(pretrained=True)
model.classifier[6] = nn.Sequential(
    nn.Linear(4096, 1),
    nn.Sigmoid()  
)
model.to(device)
# 모델 경로 설정
model_pth = 'model_result'

# optimizer 설정
optim = Adam(model.parameters(), lr=learning_rate)

# earlystopping 설정
early_stopping = EarlyStopping(patience=5, min_delta=0.005)
best_val_loss = float('inf')

# csv 파일 저장
train_metrics_file = 'train_metrics.csv'
validation_metrics_file = 'validation_metrics.csv'
with open(os.path.join(model_pth,train_metrics_file), 'w', newline='') as train_file, open(os.path.join(model_pth,validation_metrics_file), 'w', newline='') as val_file:
    train_writer = csv.writer(train_file)
    val_writer = csv.writer(val_file)
    train_writer.writerow(['Epoch', 'Train Loss', 'Train Accuracy'])
    val_writer.writerow(['Epoch', 'Validation Loss', 'Validation Accuracy'])

# Train
for epoch in tqdm(range(epoch_num)):
    model.train()
    correct_train = 0
    total_train = 0
    for data, label in train_loader:
        optim.zero_grad()
        preds = model(data.to(device)).squeeze(dim=1)
        label = label.float()
        loss = nn.BCELoss()(preds, label.to(device))
        loss.backward()
        optim.step()
        
        # Train accuracy 계산
        predicted_train = (preds > 0.5).int()
        total_train += label.size(0)
        correct_train += (predicted_train == label.to(device)).sum().item()

    # total_accuracy 계산
    train_accuracy = 100 * correct_train / total_train
    # train 결과 저장
    save_metrics('train', model_pth, epoch+1, loss.item(), train_accuracy)
    # 모델 weights 저장
    torch.save(model.state_dict(), f'{model_pth}/VGG{epoch+1}.pt')

    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, label in val_loader:
            preds = model(data.to(device)).squeeze(dim=1)
            label = label.float()
            val_loss += nn.BCELoss()(preds, label.to(device)).item()
            predicted = (preds > 0.5).int()
            total += label.size(0)
            correct += (predicted == label.to(device)).sum().item()

    # Validation accuracy와 loss 계산
    val_loss /= len(val_loader)
    val_accuracy = 100 * correct / total
    # Validation 결과 저장
    save_metrics('validation', model_pth, epoch+1, val_loss, val_accuracy)

    # Early Stopping
    if early_stopping(val_loss):
        print('Early stopping triggered!')
        break

# 총 학습시간 기록
print(f'training duration: {time.time()-train_start_time}')

In [None]:
# 모델 load
model.load_state_dict(torch.load(f'{model_pth}/CNN_best.pt'))
model.eval()
test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for data, label in test_loader:
        preds = model(data.to(device)).squeeze(dim=1)
        label = label.float()
        test_loss += nn.BCELoss()(preds, label.to(device)).item()
        predicted = (preds > 0.5).int()
        total += label.size(0)
        correct += (predicted == label.to(device)).sum().item()

# Test accuracy와 loss 계산
test_loss /= len(test_loader)
accuracy = 100 * correct / total
# Test 결과 기록
print(f'Test loss: {test_loss}, Test Accuracy: {accuracy}%')