In [1]:
# 패키지 다운로드
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.datasets as datasets

# CUDA 지원 활성화
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA 사용 가능")
else:
    device = torch.device("cpu")
    print("CUDA 사용 불가")

# 시드 고정
seed = 42
torch.manual_seed(seed)
# GPU 절약
torch.backends.cudnn.benchmark = True

# print(torch.cuda.device_count()) # 사용 가능한 장치가 몇 개인지 확인합니다.
# print(torch.cuda.get_device_name(0)) # 첫번째 GPU의 장치명을 확인합니다.
# print(torch.cuda.get_device_name(1)) # 두번째 GPU의 장치명을 확인합니다.


CUDA 사용 가능


In [2]:
# 이미지 라벨 생성
def get_label_from_foldername(foldername):
    # 각 폴더의 이름에 따라 라벨을 할당
    if foldername == "preprocessed_brown_glass":
        return "0"
    elif foldername == "preprocessed_brown_glass_packaging":
        return "1"
    elif foldername == "preprocessed_clear_glass":
        return "2"
    elif foldername == "preprocessed_clear_glass_packaging":
        return "3"
    elif foldername == "preprocessed_green_glass":
        return "4"
    elif foldername == "preprocessed_green_glass_packaging":
        return "5"
    elif foldername == "preprocessed_reused_glass":
        return "6"
    elif foldername == "preprocessed_reused_glass_packaging":
        return "7"
    elif foldername == "preprocessed_unclassified_glass":
        return "8"
    else:
        raise ValueError(f"Invalid folder name: {foldername}")

In [3]:
# 데이터셋 경로
dataset_path = "preprocessed_image"

# 데이터셋 불러오기
dataset = datasets.ImageFolder(
    dataset_path,
    # 데이터 증강을 위한 변환 함수들 정의
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # 이미지 크기 조정
        # transforms.RandomResizedCrop(size=224),  # 무작위로 잘라내고 크기 조정
        transforms.RandomHorizontalFlip(),  # 수평으로 무작위로 뒤집기
        transforms.RandomRotation(degrees=30),  # 무작위로 회전 (±30도 범위)
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # 색감 조정
        transforms.ToTensor(),  # 이미지를 텐서로 변환
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # 이미지 정규화
    ])
)

# 전체 데이터셋을 훈련 데이터와 검증 데이터로 나누기
train_ratio = 0.8
train_size = int(train_ratio * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# 데이터로더 생성
batch_size = 4
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

In [4]:
# 모델 레이어 생성
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(3, 96, kernel_size=11, stride=4)
        self.batchnorm1 = nn.BatchNorm2d(96)
        self.pool1 = nn.MaxPool2d(3, 2)
        
        self.conv2 = nn.Conv2d(96, 256, kernel_size=5, padding=2)
        self.batchnorm2 = nn.BatchNorm2d(256)
        self.pool2 = nn.MaxPool2d(3, 2)
        
        self.conv3 = nn.Conv2d(256, 384, kernel_size=3, padding=1)
        self.batchnorm3 = nn.BatchNorm2d(384)
        
        self.conv4 = nn.Conv2d(384, 384, kernel_size=3, padding=1)
        self.batchnorm4 = nn.BatchNorm2d(384)
        
        self.conv5 = nn.Conv2d(384, 256, kernel_size=3, padding=1)
        self.batchnorm5 = nn.BatchNorm2d(256)
        self.pool3 = nn.AdaptiveMaxPool2d((6, 6))
        
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256 * 6 * 6, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 9)
        
    def forward(self, x):
        x = nn.functional.relu(self.conv1(x))
        x = self.batchnorm1(x)
        x = self.pool1(x)
        
        x = nn.functional.relu(self.conv2(x))
        x = self.batchnorm2(x)
        x = self.pool2(x)
        
        x = nn.functional.relu(self.conv3(x))
        x = self.batchnorm3(x)
        
        x = nn.functional.relu(self.conv4(x))
        x = self.batchnorm4(x)
        
        x = nn.functional.relu(self.conv5(x))
        x = self.batchnorm5(x)
        x = self.pool3(x)
        
        x = self.flatten(x)
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x.to(device)

# 모델 인스턴스 생성 및 CUDA 장치로 이동
model = Model().to(device)

In [5]:
print(model)
print(device)

Model(
  (conv1): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4))
  (batchnorm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (batchnorm2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm3): BatchNorm2d(384, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv4): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm4): BatchNorm2d(384, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm5): BatchNorm2d(256, eps=1e-05, momentum=0.1,

In [6]:
# 모델 학습시키기
# 손실 함수와 최적화 알고리즘 설정
criterion = nn.CrossEntropyLoss().to(device)  # 손실 함수 수정
optimizer = optim.Adam(model.parameters(), lr=0.003)

# 학습 반복
epochs = 10
best_accuracy = 0.0  # 가장 좋은 정확도를 저장할 변수 초기화
best_model_path = "best_model.pt" # 가장 좋은 모델 매개변수의 저장 경로
prev_loss = None

# 손실 값 저장 리스트 초기화
loss_values = []

for epoch in range(epochs):
    running_loss = 0.0
    for images, labels in train_dataloader:
        # 이미지와 라벨을 CUDA 장치로 이동
        images = images.to(device)
        labels = labels.to(device)
        # 그래디언트 초기화
        optimizer.zero_grad()
        # 모델에 이미지 전달하여 예측 수행
        outputs = model(images)
        # 손실 계산
        loss = criterion(outputs, labels)

        # 가중치 감소 (L2 규제)
        for param in model.parameters():
            loss += 0.001 * torch.norm(param)

        # 역전파 및 가중치 업데이트
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch {epoch+1} - Loss: {running_loss / len(train_dataloader)}")

    # Early Stopping 체크
    if prev_loss is not None and running_loss >= prev_loss:
        print("Early stopping due to increasing loss")
        break
    prev_loss = running_loss
  

Epoch 1 - Loss: 2.410658129274905
Epoch 2 - Loss: 2.2589997268051
Epoch 3 - Loss: 2.1972432211716275
Epoch 4 - Loss: 2.196359983413502
Epoch 5 - Loss: 2.1943523368112436
Epoch 6 - Loss: 2.193998463527343
Epoch 7 - Loss: 2.1948614157593838
Early stopping due to increasing loss


In [7]:
# 검증 데이터셋을 통한 모델 평가
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in val_dataloader:
        # 이미지와 라벨을 CUDA 장치로 이동
        images = images.to(device)
        labels = labels.to(device)

        # 모델에 이미지 전달하여 예측 수행
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)  # 예측 수정

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy}%")
    
    # 현재 epoch의 결과가 이전의 가장 좋은 결과보다 좋은 경우, 모델 상태 저장
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), best_model_path)
        print("Best model saved.")
        

Test Accuracy: 21.98591076819859%
Best model saved.
