In [None]:
# 필요한 라이브러리 호출

import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as Datasets
import time
from tqdm import trange
from matplotlib import pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# # VGG 모델 정의

# class VGG(nn.Module):

#     # 완전연결층과 출력층 정의
#     def __init__(self, features, output_dim):
#         super().__init__()
#         # VGG 모델에 대한 매개변수에서 받아 온 features 값을 self.features에 넣어 줌
#         self.features = features
#         self.avgpool = nn.AdaptiveAvgPool2d(7)
#         self.classifier = nn.Sequential(
#             nn.Linear(512*7*7, 4096),
#             nn.ReLU(inplace=True),
#             nn.Dropout(0.5),
#             nn.Linear(4096, 4096),
#             nn.ReLU(inplace=True),
#             nn.Dropout(0.5),
#             nn.Linear(4096, output_dim),
#         )
    
#     def forward(self, x):
#         x = self.features(x)
#         x = self.avgpool(x)
#         h = x.view(x.shape[0], -1)
#         x = self.classifier(h)
#         return x, h

In [None]:
# # 모델 유형 정의

# # 숫자는 Conv2d를 수행, M은 최대 풀링 수행
# # 8(합성곱층) + 3(풀링층) = 11(전체 계층) = VGG11
# vgg_11_config = [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

# # 10(합성곱층) + 3(풀링층) = 13(전체 계층) = VGG13
# vgg_13_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M']

# # 13(합성곱층) + 3(풀링층) = 16(전체 계층) = VGG16
# vgg_16_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']

# # 16(합성곱층) + 3(풀링층) = 19(전체 계층) = VGG19
# vgg_19_config = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M']

# # 내가 이렇게 정의해서 사용할 수 있다.
# My_Vgg = [64, 64, 64, 'M', 128, 128, 128, 'M', 256, 256, 256, 'M']

In [None]:
# # VGG 계층 정의

# def get_vgg_layers(config, batch_norm):
#     layers = []
#     in_channels = 3

#     # vggn_config 값을 가져옵니다.
#     for c in config:
#         # c가 M이 아니거나 정수가 아니라면 오류
#         assert c == 'M' or isinstance(c, int)
#         # 불러온 값이 M이면 최대 풀링(MaxPool2d)을 적용
#         if c == 'M':
#             layers += [nn.MaxPool2d(kernel_size=2)]
#         # 불러온 값이 숫자이면 합성곱(Conv2d)을 적용
#         else:
#             conv2d = nn.Conv2d(in_channels, c, kernel_size=3, padding=1)
#             # 배치 정규화를 적용할지에 대한 코드
#             if batch_norm:
#                 # 배치 정규화가 적용될 경우 배치 정규화 + ReLU 적용
#                 layers += [conv2d, nn.BatchNorm2d(c), nn.ReLU(inplace=True)]
#             else:
#                 # 배치 정규화가 적용되지 않을 경우 ReLU만 적용
#                 layers += [conv2d, nn.ReLU(inplace=True)]
#             in_channels = c
    
#     # 네트워크의 모든 계층을 반환
#     return nn.Sequential(*layers)
        

In [None]:
# # 모델 계층 생성

# vgg19_layers = get_vgg_layers(vgg_19_config, batch_norm=True)

In [None]:
# print(vgg19_layers)

In [None]:
# # VGG19 전체에 대한 네트워크

# # 개와 고양이 두 개의 클래스 사용
# OUTPUT_DIM = 2
# model = VGG(vgg19_layers, OUTPUT_DIM)
# print(model)

In [None]:
# 사전 훈련된 VGG11 모델 사용

import torchvision.models as models
from torchvision.models import VGG11_BN_Weights

# pretrained_model = models.vgg19_bn(pretrained=True)
pretrained_model = models.vgg11_bn(weights=VGG11_BN_Weights.IMAGENET1K_V1)
print(pretrained_model)

In [None]:
# 이미지 데이터 전처리

train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomRotation(5),
    transforms.RandomHorizontalFlip(0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# ImageFolder를 이용하여 데이터셋 불러오기

train_path = './data/catanddog/train'
test_path = './data/catanddog/test'

train_dataset = torchvision.datasets.ImageFolder(
    train_path,
    transform=train_transforms,
)

test_dataset = torchvision.datasets.ImageFolder(
    test_path,
    transform=test_transforms
)

print(len(train_dataset)), print(len(test_dataset))

In [None]:
# 훈련과 검증 데이터 분할

VALID_RATIO = 0.9
n_train_examples = int(len(train_dataset) * VALID_RATIO)
n_valid_examples = int(len(train_dataset) - n_train_examples)

# 데이터 로더로 넘어간 이후에는 random_split()이 불가능하다.
train_data, valid_data = data.random_split(train_dataset, [n_train_examples, n_valid_examples])

In [None]:
# 검증 데이터 전처리
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms

In [None]:
# 데이터가 훈련, 검증, 테스트로 분류되었기 때문에 각각 몇 개의 이미지 데이터가 있는지 확인

print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_dataset)}')

In [None]:
# 메모리로 데이터 불러오기

BATCH_SIZE = 16
train_iterator = data.DataLoader(train_data,
                                 # 훈련 데이터는 임의로 섞어서 가져오기
                                 shuffle=True,
                                 batch_size=BATCH_SIZE)

valid_iterator = data.DataLoader(valid_data,
                                 batch_size=BATCH_SIZE)

test_iterator = data.DataLoader(test_dataset,
                                 batch_size=BATCH_SIZE)

In [None]:
# 옵티마이저와 손실 함수 정의

# lr: 학습률, 1e-7: 10의 -7승, 매우 작은 값이다.
optimizier = optim.Adam(pretrained_model.parameters(), lr=1e-7)
criterion = nn.CrossEntropyLoss()

model = pretrained_model.to(device)
criterion = criterion.to(device)

In [None]:
# 모델 정확도 측정 함수

def calculate_accuracy(y_pred, y):
    # y_pred에서 열을 따라 가장 큰 값의 색인의 값을 top_pred에 할당
    top_pred = y_pred.argmax(1, keepdim=True)
    # 예측과 정답이 일치하는 것의 개수를 합산하겠다.
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    # 텐서의 첫 번째 차원의 크기, 샘플의 개수로 나누어준다.
    acc = correct.float() / y.shape[0]
    return acc

In [None]:
# 모델 학습 함수 정의

def train(model, iterator, optimizer, criterion, device, accumulation_steps=4):
    epoch_loss, epoch_acc = 0, 0

    model.train()
    for (x, y) in (iterator):
        x = x.to(device)
        y = y.to(device)

        optimizier.zero_grad()
        y_pred = model(x)
        loss = criterion(y_pred, y)
        acc = calculate_accuracy(y_pred, y)
        loss.backward()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# 모델 성능 측정 함수

def evaluate(model, iterator, criterion, device):
    epoch_loss, epoch_acc = 0, 0
    model.eval()
    
    with torch.no_grad():
        for (x, y) in iterator:
            x = x.to(device)
            y = y.to(device)
            y_pred = model(x)
            loss = criterion(y_pred, y)
            acc = calculate_accuracy(y_pred, y)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# 학습 시간 측정 함수

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
# 모델 학습

EPOCHS = 5
best_valid_loss = float('inf')
# 정수를 반복할 때는 trange
for epoch in trange(EPOCHS, leave=False):
    # 시스템의 단조 증가 시간(시스템 시작 이후 경과한 시간)을 반환
    start_time = time.monotonic()
    # 훈련 데이터셋을 모델에 적용한 결과(오차와 정확도)를 train_loss와 train_acc에 저장
    train_loss, train_acc = train(model, train_iterator, optimizier, criterion, device)
    # 검증 데이터셋을 모델에 적용한 결과(오차와 정확도)를 valid_loss와 valid_acc에 저장
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion, device)
    
    # valid_loss가 가장 작은 값을 구하고, 그 상태의 모델을 VGG-model.pt 이름으로 저장
    if best_valid_loss < valid_loss:
        best_valid_loss = valid_loss
        # .pt 파일은 pytorch에서 모델의 가중치를 저장하기 위해 사용하는 파일
        torch.save(model.state_dict(), './data/VGG-model.pt')
    
    end_time = time.monotonic()
    # 모델 훈련에 대한 시작과 종료 시간을 저장
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Valid. Loss: {valid_loss:.3f} | Valid. Acc: {valid_acc*100:.2f}%')

In [None]:
# 테스트 데이터셋을 이용한 모델 성능 측정

# 저장된 모델 가중치를 현재 모델에 적용함
model.load_state_dict(torch.load('./data/VGG-model.pt'))
# 테스트 데이터셋에 대한 모델의 손실과 정확도를 측정
test_loss, test_acc = evaluate(model, test_iterator, criterion, device)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

In [None]:
# 테스트 데이터셋을 이용한 모델의 예측 확인 함수

def get_predictions(model, iterator):
    model.eval()
    images, labels, probs = [], [], []

    with torch.no_grad():
        for (x, y) in iterator:
            x = x.to(device)
            y_pred = model(x)
            y_prob = F.softmax(y_pred, dim=1)
            top_pred = y_prob.argmax(1, keepdim=True)
            images.append(x.cpu())
            labels.append(y.cpu())
            probs.append(y_prob.cpu())

    # 행을 기준으로 images를 이어 붙여라
    images = torch.cat(images, dim=0)
    labels = torch.cat(labels, dim=0)
    probs = torch.cat(probs, dim=0)
    return images, labels, probs

In [None]:
# 예측 중에서 정확하게 예측한 것을 추출

images, labels, probs = get_predictions(model, test_iterator)
pred_labels = torch.argmax(probs, 1)

# 예측과 정답이 같은지 비교
corrects = torch.eq(labels, pred_labels)
correct_examples = []

for image, label, prob, correct in zip(images, labels, probs, corrects):
    if correct:
        correct_examples.append((image, label, prob))

# 행을 기준으로 prob의 값을 기준으로 내림차순으로 정렬
correct_examples.sort(reverse=True, key=lambda x: torch.max(x[2] , dim=0).value)

In [None]:
# 이미지 출력을 위한 전처리

def normalize_image(image):
    image_min = image.min()
    image_max = image.max()
    # torch.clamp는 이미지의 모든 픽셀 값이 [min, max] 범위 내에 있도록 조정
    image.clamp_(min=image_min, max=image_max)
    # div를 통해 최대 - 최소 정규화를 할 수 있다.
    image.add_(-image_min).div_(image_max - image_min + 1e-5)
    return image

In [None]:
# 모델이 정확하게 예측한 이미지 출력 함수

def plot_most_correct(correct, classes, n_images, normalize=True):
    # np.sqrt는 제곱근을 계산(0.5를 거듭 제곱) 왜 ? 이미지를 출력할 때 레이아웃을 최대한 정사각형의 가깝게 만들기 위해
    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))
    fig = plt.figure(figsize=(25, 20))

    for i in range(rows * cols):
        # 출력하려는 그래프 개수만큼 subplot을 만듦
        ax = fig.add_subplot(rows, cols, i+1)
        image, true_label, probs = correct[i]
        # 이미지의 차원을 변경하기 위한 메서드
        image = image.permute(1, 2, 0)
        true_prob = probs[true_label]
        correct_prob, correct_label = torch.max(probs, dim=0)
        true_class = classes[true_label]
        correct_class = classes[correct_label]

        # 본래 이미지대로 출력하기 위해 normalize_image 함수 호출
        if normalize:
            image = normalize_image(image)
        
        ax.imshow(image.cpu().numpy())
        ax.set_title(f'true label: {true_class} ({true_prob:.3f})\n' \
                     f'pred label: {correct_class} ({correct_prob:.3f})')
        ax.axis('off')

    fig.subplots_adjust(hspace=0.4)

In [None]:
# 예측 결과 이미지 출력

classes = test_dataset.classes
N_IMAGES = 5
plot_most_correct(correct_examples, classes, N_IMAGES)