<a href="https://colab.research.google.com/github/yunyoungwoo/2024S-Ajou-ML-FP/blob/main/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
from torchvision import transforms
import torchvision.transforms.functional as v2
from tqdm import tqdm
import numpy as np
import os

In [None]:
# 데이터프레임을 사용하여 커스텀 데이터셋 생성
class BaseDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_path = self.dataframe.iloc[idx, 0]  # 이미지 파일 경로
        image = Image.open(image_path).convert('RGB')  # 이미지 열기
        label = self.dataframe.iloc[idx, 1]  # 레이블

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# 데이터프레임 로드 및 데이터 전처리 파이프라인 정의
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # 이미지 크기 조정
    transforms.ToTensor(),  # torch 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 정규화
    transforms.RandomRotation(degrees=90),  # 랜덤 회전
    transforms.RandomHorizontalFlip(p=0.5)  # 랜덤 수평 뒤집기
])

In [None]:
# 데이터프레임을 사용하여 데이터셋 생성
train_dataset = BaseDataset(train_df, transform=train_transforms)
val_dataset = BaseDataset(val_df, transform=train_transforms)
test_dataset = BaseDataset(test_df, transform=train_transforms)

In [None]:
# 데이터로더 생성
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
import torch.nn.functional as F

class ImprovedCNN(nn.Module):
    def __init__(self):
        super(ImprovedCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.conv8 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(in_features=512 * 7 * 7, out_features=4096)
        self.fc2 = nn.Linear(in_features=4096, out_features=4096)
        self.fc3 = nn.Linear(in_features=4096, out_features=10)  # 예시로 10개의 클래스에 대한 출력을 가정합니다.
        self.dropout = nn.Dropout(p=0.5)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.batchnorm2 = nn.BatchNorm2d(64)
        self.batchnorm3 = nn.BatchNorm2d(128)
        self.batchnorm4 = nn.BatchNorm2d(128)
        self.batchnorm5 = nn.BatchNorm2d(256)
        self.batchnorm6 = nn.BatchNorm2d(256)
        self.batchnorm7 = nn.BatchNorm2d(512)
        self.batchnorm8 = nn.BatchNorm2d(512)

    def forward(self, x):
        x = self.pool(F.relu(self.batchnorm1(self.conv1(x))))
        x = self.pool(F.relu(self.batchnorm2(self.conv2(x))))
        x = self.pool(F.relu(self.batchnorm3(self.conv3(x))))
        x = self.pool(F.relu(self.batchnorm4(self.conv4(x))))
        x = self.pool(F.relu(self.batchnorm5(self.conv5(x))))
        x = self.pool(F.relu(self.batchnorm6(self.conv6(x))))
        x = self.pool(F.relu(self.batchnorm7(self.conv7(x))))
        x = self.pool(F.relu(self.batchnorm8(self.conv8(x))))
        x = x.view(-1, 512 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [None]:
# 모델 생성 및 CUDA 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedCNN().to(device)

In [None]:
# 손실 함수 및 최적화 함수 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# 학습 및 평가 함수 정의
def train(model, train_loader, optimizer, criterion):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    train_loss = running_loss / len(train_loader)
    train_acc = 100. * correct / total
    return train_loss, train_acc

In [None]:
def evaluate(model, val_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    val_loss = running_loss / len(val_loader)
    val_acc = 100. * correct / total
    return val_loss, val_acc

In [None]:
# Early stopping 관련 변수 설정
patience = 5  # 성능이 향상되지 않은 에포크를 얼마나 기다릴 것인지 지정
counter = 0  # 성능이 향상되지 않은 에포크 카운터 초기화
best_val_loss = np.inf  # 가장 낮은 검증 손실값을 저장할 변수 초기화

In [1]:
# 학습 및 검증 실행
NUM_EPOCHS = 10
best_val_acc = 0.0

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

    # 검증 손실이 이전 최고 손실값보다 낮은 경우 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        counter = 0  # 성능이 향상되었으므로 카운터 초기화
    else:
        counter += 1  # 성능이 향상되지 않았으므로 카운터 증가

    # 카운터가 지정된 인내심(patience)을 초과하면 학습 종료
    if counter >= patience:
        print(f"Early stopping at epoch {epoch+1} as validation loss did not improve for {patience} epochs.")
        break

NameError: name 'train' is not defined