In [1]:
import torch
import torchvision.datasets as dsets
import torchvision.transforms as transforms
import torch.nn.functional as F
import torchsummary
import torch.nn as nn

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from torch.utils.data import random_split
from torch import optim

## GPU 사용 & Seed값 조정


In [2]:
# == GPU ==
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

# == Seed값 ==
torch.manual_seed(777)

if device == 'cuda':
  torch.cuda.manual_seed(777)

cuda:0


<torch._C.Generator at 0x78b7f0141610>

## Dataset

In [3]:
def get_train(flag = True):
  train = dsets.CIFAR10(root = '/content/data',
                        train = True,
                        download = True)

  test = dsets.CIFAR10(root = '/content/data',
                       train = False,
                       download = True)

  return train, test

train, test = get_train(flag = True)

print('Train Lenght : ', len(train))
print('Test Lenght : ', len(test))

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /content/data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:12<00:00, 13155001.88it/s]


Extracting /content/data/cifar-10-python.tar.gz to /content/data
Files already downloaded and verified
Train Lenght :  50000
Test Lenght :  10000


## Data Processing

In [4]:
train.transform = transforms.ToTensor()
test.transform = transforms.ToTensor()

train_meanRGB = [np.mean(x.numpy(), axis = (1, 2)) for x, _ in train]
train_stdRGB = [np.std(x.numpy(), axis = (1, 2)) for x, _ in train]

meanR = np.mean([m[0] for m in train_meanRGB])
meanG = np.mean([m[1] for m in train_meanRGB])
meanB = np.mean([m[2] for m in train_meanRGB])

stdR = np.mean([s[0] for s in train_stdRGB])
stdG = np.mean([s[1] for s in train_stdRGB])
stdB = np.mean([s[2] for s in train_stdRGB])

train_mean = [meanR, meanG, meanB]
train_std = [stdR, stdG, stdB]

test_meanRGB = [np.mean(x.numpy(), axis = (1, 2)) for x, _ in test]
test_stdRGB = [np.std(x.numpy(), axis = (1, 2)) for x, _ in test]

meanR = np.mean([m[0] for m in test_meanRGB])
meanG = np.mean([m[1] for m in test_meanRGB])
meanB = np.mean([m[2] for m in test_meanRGB])

stdR = np.mean([s[0] for s in test_stdRGB])
stdG = np.mean([s[1] for s in test_stdRGB])
stdB = np.mean([s[2] for s in test_stdRGB])


test_mean = [meanR, meanG, meanB]
test_std = [stdR, stdG, stdB]

print(' == == == == Train == == == ==')
print('각 Channel당 pixel Mean 값 : ', train_mean)
print('각 Channel당 pixel Std 값 : ', train_std)
print(' == == == == == == == == == ==')

print(' == == == == Test == == == ==')
print('각 Channel당 pixel Mean 값 : ', test_mean)
print('각 Channel당 pixel Std 값 : ', test_std)
print(' == == == == == == == == == ==')

 == == == == Train == == == ==
각 Channel당 pixel Mean 값 :  [0.49139965, 0.48215845, 0.4465309]
각 Channel당 pixel Std 값 :  [0.20220213, 0.19931543, 0.20086348]
 == == == == == == == == == ==
 == == == == Test == == == ==
각 Channel당 pixel Mean 값 :  [0.49421427, 0.48513138, 0.45040908]
각 Channel당 pixel Std 값 :  [0.20189482, 0.19902097, 0.20103233]
 == == == == == == == == == ==


## Data Augmentation

In [26]:
train_tf = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness = 0.2, contrast = 0.2, saturation = 0.2, hue = 0.2),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std)
])

test_tf = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(test_mean, test_std)
])

train.transform = train_tf
test.transform = test_tf

# 모델 구현

## SEblock

In [6]:
class SEblock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEblock, self).__init__()
        self.squeeze_avg = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        out = self.squeeze_avg(x)
        out = out.view(out.size(0), -1)
        out = self.excitation(out)
        out = out.view(out.size(0), out.size(1), 1, 1)
        return x * out

## VGG16 + SEblock

In [7]:
class VGG16(nn.Module):
  def __init__(self):
    super(VGG16, self).__init__()

    self.num_class = 1000 # cifar10은 10개의 class
    self.conv = self._make_conv_layers([64, 128, 128, 256, 512, 512], [2, 2, 2, 2, 2, 2])
    self.fc = self._fc_layers()
    self._initialize_weights()


  def _make_conv_layers(self, channels, blocks):
    layers = []
    in_channels = 3

    # 1x1 conv도 있다는 것을 명심
    for i, (out_channels, block_count) in enumerate(zip(channels, blocks), 1):
      for _ in range(block_count):
        layers.append(self._conv_layers(in_channels, out_channels))
        in_channels = out_channels

      # 1x1 conv -> 4~6번째 layer
      if i >= 4:
        layers.append(self._conv_layers(in_channels, out_channels, kernel_size = 1, padding = 0))

      # 마지막 layer에는 maxpooling 없음
      if i < 6:
        layers.append(nn.MaxPool2d(kernel_size = 2, stride = 2))

    return nn.Sequential(*layers)

  # == Conv ==
  def _conv_layers(self, in_channels, out_channels, kernel_size = 3, stride = 1, padding = 1):
    return nn.Sequential(
        nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = kernel_size, stride = stride, padding = padding),
        nn.BatchNorm2d(out_channels),
        SEblock(out_channels), # SEblock 추가함
        nn.ReLU(inplace = True)
    )

  # == Fc ==
  def _fc_layers(self):
    feature_vector = 512 * 7 * 7
    return nn.Sequential(
        # fc 1

        nn.Linear(feature_vector, 4096),
        nn.ReLU(inplace = True),
        nn.Dropout(p = 0.5),

        # fc 2
        nn.Linear(4096, 4096),
        nn.ReLU(inplace = True),
        nn.Dropout(p = 0.5),

        # output layer
        nn.Linear(4096, self.num_class)
    )

  # == 가중치 초기화 ==
  def _initialize_weights(self):
    for layer in self.modules():
        if isinstance(layer, nn.Conv2d):
            nn.init.kaiming_normal_(layer.weight, mode='fan_out', nonlinearity='relu')
            if layer.bias is not None:
                nn.init.constant_(layer.bias, 0)
        elif isinstance(layer, nn.Linear):
            nn.init.normal_(layer.weight, 0, 0.01)
            if layer.bias is not None:
                nn.init.constant_(layer.bias, 0)

  def forward(self, x):
    out = self.conv(x)
    out = out.view(out.size(0), -1)
    out = self.fc(out)
    return out

In [8]:
model = VGG16().to(device)
torchsummary.summary(model, input_size = (3, 224, 224), device = 'cuda')

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 224, 224]           1,792
       BatchNorm2d-2         [-1, 64, 224, 224]             128
 AdaptiveAvgPool2d-3             [-1, 64, 1, 1]               0
            Linear-4                    [-1, 4]             256
              ReLU-5                    [-1, 4]               0
            Linear-6                   [-1, 64]             256
           Sigmoid-7                   [-1, 64]               0
           SEblock-8         [-1, 64, 224, 224]               0
              ReLU-9         [-1, 64, 224, 224]               0
           Conv2d-10         [-1, 64, 224, 224]          36,928
      BatchNorm2d-11         [-1, 64, 224, 224]             128
AdaptiveAvgPool2d-12             [-1, 64, 1, 1]               0
           Linear-13                    [-1, 4]             256
             ReLU-14                   

##  Train & Valid

In [9]:
batch_size = 64
learning_rate = 0.01
momentum = 0.9
weight_decay = 0.00005
learnign_rate = 0.01
epochs = 10 # 10으로 조정

In [10]:
# == split train, valud ==
train_ratio = 0.8
val_ratio = 0.2

train_size = int(len(train) * train_ratio)
val_size = len(train) - train_size

split_train, split_train = random_split(train, [train_size, val_size])

In [11]:
train_loader = torch.utils.data.DataLoader(dataset = split_train,
                                           batch_size=  batch_size,
                                           shuffle = True,
                                           drop_last = True)

val_loader = torch.utils.data.DataLoader(dataset = split_train,
                                        batch_size = batch_size,
                                        shuffle = True,
                                        drop_last = True)

In [12]:
optimizer = optim.SGD(model.parameters(), lr = learning_rate, momentum = momentum, weight_decay = weight_decay)
criterion = nn.CrossEntropyLoss().to(device)

In [13]:
# Step 4: 반복적인 학습
for epoch in range(epochs):
    model.train()  # 모델을 학습 모드로 설정


    '''
    1. for문 1번 반복할때마다 batch_idx는 batch_size만큼 증가
     ex) 1 iter: 0 ~ 127
         2 iter: 128 ~ 255

         전체 훈련 데이터를 다 쓰기전까지 반복.
    '''
    for batch_idx, (data, target) in enumerate(train_loader):
        # 데이터와 레이블을 GPU로 이동 (if available)
        # data = data.view(-1, 3, 224, 224) # multi-crop 대체
        data, target = data.to(device), target.to(device)

        # Forward 연산
        output = model(data)

        # 손실 계산
        loss = criterion(output, target)

        # Backward 연산 및 매개변수 업데이트
        optimizer.zero_grad() # 미분값 중복 방지
        loss.backward() # 오차역전파
        optimizer.step() # parameter 업데이트

        # 일정 간격으로 손실 출력


        if batch_idx % 100 == 0:
            #print(len(train_loader)) 항상 312로 동일한 값
            print(batch_idx)
            print('Epoch {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch + 1, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item())) # 100.은 실수형을 말한다.(실수형으로 출력하기 위함)
                # len(train_loader)는 배치 개수를 나타낸다. 여기서는 312개(312번 가중치 update)

    # 검증 데이터셋을 사용한 모델 평가
    model.eval()  # 모델을 평가 모드로 설정
    val_loss = 0
    correct = 0

    # 미분값 계산 x
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)

            # Forward 계산
            output = model(data)

            val_loss += criterion(output, target).item() # 손실함수 계산
            pred = output.argmax(dim=1, keepdim=True) # 예측값(행마다)에서 가장 큰 값의 인덱스 가져옴, (n, 1)형식
            correct += pred.eq(target.view_as(pred)).sum().item() # target을 pred와 같은 shape으로 만들어준다.
            '''
            .eq()는 두 tensor의 shape이 동일해야 비교 연산이 가능하다.
            '''

    val_loss /= len(val_loader.dataset) # 검증 손실함수
    val_accuracy = 100. * correct / len(val_loader.dataset) # 검증 정확도

    # 검증 결과 출력
    print('\nValidation set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        val_loss, correct, len(val_loader.dataset), val_accuracy))

0
100

Validation set: Average loss: 0.0326, Accuracy: 2797/10000 (27.97%)

0
100

Validation set: Average loss: 0.0283, Accuracy: 3464/10000 (34.64%)

0
100

Validation set: Average loss: 0.0248, Accuracy: 4315/10000 (43.15%)

0
100

Validation set: Average loss: 0.0217, Accuracy: 4975/10000 (49.75%)

0
100

Validation set: Average loss: 0.0204, Accuracy: 5396/10000 (53.96%)

0
100

Validation set: Average loss: 0.0195, Accuracy: 5589/10000 (55.89%)

0
100

Validation set: Average loss: 0.0187, Accuracy: 5688/10000 (56.88%)

0
100

Validation set: Average loss: 0.0164, Accuracy: 6297/10000 (62.97%)

0
100

Validation set: Average loss: 0.0140, Accuracy: 6806/10000 (68.06%)

0
100

Validation set: Average loss: 0.0132, Accuracy: 7018/10000 (70.18%)



## 모델 테스트

In [24]:
test_loader = torch.utils.data.DataLoader(test,
                                          batch_size = batch_size,
                                          shuffle = False,
                                          drop_last = True)

In [27]:
model.eval()
correct = 0
total = 0

with torch.no_grad():
  for images, labels in test_loader:
    images = images.to(device)
    labels = labels.to(device)

    outputs = model(images)
    _, predicted = torch.max(outputs.data, 1) # 가장 높은값의 인덱스, 값 반환 --> 여기서는 가장 높은 값만 반환했다.

    total += labels.size(0) # 전체 데이터 예측 개수(len(cifar_test_loader) 와 같다)
    correct += (predicted == labels.to(torch.long)).sum().item() # predicted, labels의 텐서 타입이 일치하지 않아서 long으로 바꾸어줌.
    # 맞으면 correct에 true, 틀리면 false 반환한다. 맞은 것만 개수 센다.

accuracy = 100 * correct / total
print(f'Accuracy : {accuracy}')

Accuracy : 65.87540064102564
