# 라이브러리

In [1]:
import torch

# data loading & dataloader
import torchvision
from torchvision import datasets
from torchvision.transforms import ToTensor
from torchvision.transforms import v2
from torch.utils.data import DataLoader

# image visualization
import matplotlib.pyplot as plt

# model
from torch import nn, optim


In [2]:
device = (
    'cuda' if torch.cuda.is_available()
    else 'mps' if torch.backends.mps.is_available()
    else 'cpu'
)

print(f'{device} is available')

cuda is available


# 데이터셋 불러오기
- Dataset: STL10
- 이미지 규격: 96x96


In [3]:
# # 구글 드라이브 연동하기
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
# # 데이터셋을 다운로드 받을 구글 드라이브 경로 지정하고 다운받기
# dir_path = '/content/drive/MyDrive/Colab Notebooks/'

dir_path = '../datasets/'

# 데이터셋을 받는 시점에서 일단 transform을 torch.transforms.ToTensro()만 적용하고
# 이후에 augmentation을 할 때 바꿔서 진행한다.
train_dataset = datasets.STL10(root=dir_path, split='train', download=True, transform=ToTensor())
test_dataset = datasets.STL10(root=dir_path, split='test', download=True, transform=ToTensor())

Files already downloaded and verified
Files already downloaded and verified


In [5]:
# 다운 받은 데이터셋 크기 확인
print(f'Train dataset size: {len(train_dataset)}')
print(f'Test dataset size: {len(test_dataset)}')

Train dataset size: 5000
Test dataset size: 8000


# 데이터셋 분리: validation set 만들기
- STL10은 train 5000개, test 8000개로 구성되어 있고, validation을 따로 분할해서 사용해야 한다.
- VGGNet 논문에서는 다음과 같이 데이터셋을 이용했다.  
  - Train: 1.3M
  - Validation: 50K
  - Test: 100K
  - Train : Validation : Test = 2.6 : 1 : 2
- 정확하게 같은 비율로 하려면 코드 수정 소요가 많아지니까 test에서 4000개를 분할해서 validation을 할 때 사용하겠다.
- 데이터의 차원 구조가 [C, H, W]로 되어 있는데 type이 np.array여서 tensor로 변경하면 차원 구조가 변경된다. 그래서 `torchvision.tv_tensors.Image`을 이용해서 차원 구조는 유지하되 type만 변경했다.

In [6]:
from sklearn.model_selection import train_test_split
from torchvision.tv_tensors import Image

val_imgs, test_imgs, val_labels, test_labels = train_test_split(test_dataset.data, test_dataset.labels, test_size=0.5, stratify=test_dataset.labels)
val_dataset = [(Image(img, dtype=torch.float), label) for img, label in zip(val_imgs, val_labels)]
test_dataset = [(Image(img, dtype=torch.float), label) for img, label in zip(test_imgs, test_labels)]

print('Validation dataset')
print(f'Size: {len(val_dataset)}')
print(f'Type: {type(val_dataset[0][0])}')
print(f'Shape: {val_dataset[0][0].shape}\n')

print('Test dataset')
print(f'Size: {len(test_dataset)}')
print(f'Type: {type(test_dataset[0][0])}')
print(f'Shape: {test_dataset[0][0].shape}\n')


Validation dataset
Size: 4000
Type: <class 'torchvision.tv_tensors._image.Image'>
Shape: torch.Size([3, 96, 96])

Test dataset
Size: 4000
Type: <class 'torchvision.tv_tensors._image.Image'>
Shape: torch.Size([3, 96, 96])



# 이미지 시각화
- training dataset에 포함되어 있는 이미지를 출력하기!

In [None]:
batch_size = 64
train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
def show_images(X, y, transform=None):
  labels =  ['airplane', 'bird', 'car', 'cat', 'deer', 'dog', 'horse', 'monkey', 'ship', 'truck']
  batch_size = 64
  r, c = 8, 8

  plt.figure(figsize=(24, 24))
  for i in range(batch_size):
    img = X[i, :, :, :]
    if transform:
      img = transform(img)
    img = img.permute(1, 2, 0)
    label = labels[y[i]]

    # ax = fig.add_subplot(r, c, i+1)
    # ax.imshow(img)
    # ax.title.set_text(label)
    # ax.axis('off')

    plt.subplot(r, c, i+1)
    plt.title(label)
    plt.imshow(img)
    plt.axis('off')

  plt.tight_layout()
  plt.show()


In [None]:
X, y = next(iter(train_dataloader))

show_images(X, y)

# Data augmentation  
  - 기존 방법
  1. rescale  
    - 224 이상이면 상관 없지만, 256 사용
    - 256인 이유: 224보다 작은 이미지를 키우는 것이기 때문에 + 논문에서 이용한 가장 작은 값
  2. 224x224 random crop  
    - 논문에서는 몇 장의 crop을 만들 것인지 나온 게 없다.
    - 고민 중
  3. horizontal flip  
    - v2.RandomHorizontalFlip()을 하면 두 장이 나오는게 아니다.
    - 방법 고민 중
  4. random RGB color shift  
    - v2.ColorJittering? 이런 게 있긴 한데 원하는 건 아닌 듯.
    - 생략할 수도. 고민 중  
  - 참고한 자료  
  https://www.kaggle.com/code/joonasyoon/rps-classification-vgg16-bn?scriptVersionId=91176352

In [7]:
# Normalize에 이용하는 값 찾기
import numpy as np

train_meanRGB = [np.mean(x.numpy(), axis=(1,2)) for x, _ in train_dataset]
train_stdRGB = [np.std(x.numpy(), axis=(1,2)) for x, _ in train_dataset]

train_meanR = np.mean([m[0] for m in train_meanRGB])
train_meanG = np.mean([m[1] for m in train_meanRGB])
train_meanB = np.mean([m[2] for m in train_meanRGB])

train_stdR = np.mean([s[0] for s in train_stdRGB])
train_stdG = np.mean([s[1] for s in train_stdRGB])
train_stdB = np.mean([s[2] for s in train_stdRGB])

del train_meanRGB
del train_stdRGB

In [8]:
# 기본적으로 적용하는 transforms와 data augmentation에 적용하는 transforms 정의하기
# AlexNet에서 한 대로 five crop을 하려고 했으나 그럼 램이 너무 작아서 폭발함
default_transforms = v2.Compose([
  v2.Resize(256),
  v2.RandomCrop(224),
  v2.Normalize([train_meanR, train_meanG, train_meanB], [train_stdR, train_stdG, train_stdB])
])

aug_transforms = v2.Compose([
    v2.RandomHorizontalFlip(p=1.0),
])

In [13]:
# Data augmentation 수행하고 dataloader 만들기
from tqdm.notebook import tqdm

aug_train_dataset = []
aug_val_dataset = []

for img, label in tqdm(train_dataset, desc=f'Training dataset augmentation'):
  img = default_transforms(img)
  aug_img = aug_transforms(img)
  aug_train_dataset.append((img, label))
  aug_train_dataset.append((aug_img, label))

for img, label in tqdm(val_dataset, desc='Validation dataset augmentation'):
  img = default_transforms(img)
  aug_img = aug_transforms(img)
  aug_val_dataset.append((img, label))
  aug_val_dataset.append((aug_img, label))

batch_size = 256
train_dl = DataLoader(aug_train_dataset, batch_size=batch_size)
val_train_dl = DataLoader(aug_val_dataset, batch_size=batch_size)

del train_dataset
del val_dataset

ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html

# 모델 정의하기

In [None]:
def Block2(in_dim, out_dim):
  block = nn.Sequential(
      nn.Conv2d(in_dim, out_dim, kernel_size=3, padding=1),
      nn.ReLU(),
      nn.Conv2d(out_dim, out_dim, kernel_size=3, padding=1),
      nn.ReLU(),
      nn.MaxPool2d(2, 2)
  )
  return block

In [None]:
def Block3(in_dim, out_dim):
  block = nn.Sequential(
      nn.Conv2d(in_dim, out_dim, kernel_size=3, padding=1),
      nn.ReLU(),
      nn.Conv2d(out_dim, out_dim, kernel_size=3, padding=1),
      nn.ReLU(),
      nn.Conv2d(out_dim, out_dim, kernel_size=3, padding=1),
      nn.ReLU(),
      nn.MaxPool2d(2, 2)
  )
  return block

In [None]:
class VGGNet(nn.Module):
  def __init__(self):
    super().__init__()
    self.block1 = Block2(3, 64)
    self.block2 = Block2(64, 128)
    self.block3 = Block3(128, 256)
    self.block4 = Block3(256, 512)
    self.block5 = Block3(512, 512)
    self.flatten = nn.Flatten()
    self.classifier = nn.Sequential(
        nn.Linear(7*7*512, 4096),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(4096, 4096),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(4096, 10),
    )

  def forward(self, x):
    x = self.block1(x)
    x = self.block2(x)
    x = self.block3(x)
    x = self.block4(x)
    x = self.block5(x)
    x = self.flatten(x)
    results = self.classifier(x)

    return results

In [None]:
import torchinfo, torchsummaryX

model = VGGNet().to(device)

# print(model)
torchinfo.summary(model, (batch_size, 3, 224, 224))

# 학습 & 검증

In [None]:
def print_loss_correct(mode, loss, correct):
  print(f'{mode}_loss: {loss:<7f} -- {mode}_correct: {correct:<7f}')

In [None]:
def train(dl, model, loss_fn, optimizer):
  n_batch = len(dl)
  n_print = n_batch // 10

  model.train()
  for batch_idx, (X, y) in enumerate(dl):
    X, y = X.to(device), y.to(device)

    pred = model(X)
    loss = loss_fn(pred, y)
    correct = (pred.argmax(dim=1) == y).dtype(torch.float).sum().item()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (batch_idx + 1) % n_print == 0:
      print_loss_correct('train', loss.item(), correct / dl.batch_size)

In [None]:
def validate(val_dl, model, loss_fn):
  data_size = len(val_dl.dataset)
  val_loss, val_correct = 0, 0

  model.eval()
  with model.no_grad():
    for X, y in val_dl:
      X, y = X.to(device), y.to(device)

      pred = model(X)
      batch_val_loss = loss_fn(pred, y)
      batch_val_correct = (pred.argmax(dim=1) == y).dtype(float).sum().item()
      val_loss += batch_val_loss
      val_correct += batch_val_correct

  val_loss /= data_size
  val_correct /= data_size
  print_loss_correct('val', val_loss, val_correct)

  return val_loss, val_correct

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9, weight_decay=5*1e-4)
lr_scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

In [None]:
epochs = 20

for e in range(epochs):
  print(f'Epoch {e + 1}\n--------------------------------------')
  train(train_dl, model, loss_fn, optimizer)
  val_loss, val_correct = validate(val_dl, model, loss_fn)
  lr_scheduler.step(val_loss)

In [None]:
X, y = next(iter(train_dl))
img = X[0]

type(img)