In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import os

import matplotlib.pyplot as plt
import numpy as np

from collections import namedtuple

In [None]:
# ResNet은 여러 개의 Block으로 구성되어 있다
# 이미지들이 Block을 통과하는데, 해당 Block을 통과할 때마다 Block의 output과 input이 그대로 더해진다
# output과 input의 차원이 다르면 downsample과정을 통하여 input의 차원을 output의 차원에 맞도록 조정하는데
# 이러한 과정을 skip connection이라고 한다

class BasicBlock(nn.Module):
  expansion = 1

  def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
    super().__init__()
    self.conv1 = nn.Conv2D(in_channels, out_channels, kernel_size = 3, stride = stride, padding=1, bias = False)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.relu = nn.ReLU(inplace = True)

    if downsample:
      conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False)
      bn = nn.BatchNorm2d(out_channels)
      downsample = nn.Sequential(conv.bn)
    else:
      downsample = None
    self.downsample = downsample

  def forward(self, x):
    i = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)

    if self.downsample is not None:
      i = self.downsample(i)

    x += i
    x = self.relu(x)

    return x


In [None]:
# ResNet50, 101, 152에서는 기본 Block 이외에 BottleNeck Block이라는 특수한 구조의 Block을 사용한다
# Block과 BottleNeck은 input을 output에 더해준다는 면에서 똑같다고 볼 수 있다.
# 똑같은 input과 똑같은 output형상이 나온다고 해도 BottleNeck을 특수한 구조로 설계하여
# 훨씬 더 Weight가 적게 할 수 있다.
# 그로 인해 Model의 Complexity가 감소하게 된다

class Bottleneck(nn.Module):
  expansion = 4
  # resnet 모델 에서 BottleNeck block은 차례대로 64 channels, 64 channel, 256 channels인 conv계층으로 이루어져있다.
  # 해당 64channels에서 256channels로 맞추기 위하여 expansion값(4)를 곱해주는 것이다

  def __init__(self, in_channels, out_channels, stride=1, downsample=False):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size=1, stride=1, bias=False)
    self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)
    self.relu = nn.ReLU(inplace=True)

    if downsample:
      conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False)
      bn = nn.BatchNorm2d(self.expansion * out_channels)
      downsample = nn.Sequential(conv, bn)
    else:
      downsample = None
    self.downsample = downsample

  def forward(self, x):
    i = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)
    x = self.conv3(x)
    x = self.bn3(x)

    if self.downsample is not None:
      i = self.downsample(i)

    x += i
    x = self.relu(x)
    return x

In [None]:
# ResNet모델에 대한 네트워크 정의하기
class ResNet(nn.Module):
  def __init__(self, config, output_dim, zero_init_residual=False):
    super().__init__()

    # ResNet을 호출할 때 넘겨준 config값에 따라서 다양한 ResNet 모델을 만들 수 있도록 정의한 것이다
    # 즉 config기반의 Model을 만들기 위해서 config라는 변수에 설정 parameter를 넣을 수 있도록 한 것이다
    block, n_blocks, channels = config
    self.in_channels = channels[0]
    assert len(n_blocks) == len(channels) == 4

    self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(self.in_channels)
    self.relu = nn.ReLU(inplace = True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
    self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride=2)
    self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride=2)
    self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride=2)

    self.avgpool = nn.AdaptiveAvgPool2d((1,1))
    self.fc = nn.Linear(self.in_channels, output_dim)


    # 각 Residual Block의 마지막 Batch normalization을 0으로 초기화하여 다음 Residual 분기를 0에서 시작할 수 있도록 한다
    # 논문에 의하면 이러한 방식을 사용할 경우 모델 성능이 0.2 ~ 0.3%정도 향상한다고 하여 많은 ResNet에서 이용하고 있다
    if zero_init_residual:
      for m in self.modules():
        if isinstance(m, Bottleneck):
          nn.init.constant_(m.bn3.weight, 0)
        elif isinstance(m ,BasicBlock):
          nn.init.constant_(m.bn2.weight, 0)

  def get_resnet_layer(self, block, n_blocks, channels, stride=1):
    layers = []

    # 만약 해당 Block에서 입력채널과 출력채널이 다르다면 Downsampling을 해주어야 한다
    if self.in_channels != block.expansion * channels:
      downsample = True
    else:
      downsample = False

    layers.append(block(self.in_channels, channels, stride, downsample))
    for i in range(1, n_blocks):
      layers.append(block(block.expansion * channels, channels))

    self.in_channels = block.expansion * channels
    return nn.Sequential(*layers)

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)
    x = self.avgpool(x)
    x = torch.flatten(x,1) # Batch 단위로 Flatten하여 Dense layer에 입력될 수 있도록 한다
    x = self.fc(x)
    return x

In [None]:
# ResNet모델을 만들 때 config옵션을 전달하여 다양하고 가지각색의 ResNet모델을 만들 수 있도록 정의하였다.
# 다음과 같이 namedTuple형태로 config를 정의할 수 있도록 하였다.

ResNetConfig = namedtuple('ResNetConfig',['block','n_blocks', 'channels'])

In [None]:
# Resnet18, 34 model에는 BottleNeck block이 아닌 BasicBlock을 사용한다.
# block : 모델에서 사용하는 블록의 종류 (BasicBlock이냐 BottleNeck Block이냐)
# n_blocks : 모델에서 Residual Block을 입력된 개수만큼 짝지어서 하나의 layer로 구현할 예정이다.
# channels : 각 layer를 빠져나올 때 출력되는 최종 channels

resnet18_config = ResNetConfig(block=BasicBlock, n_blocks=[2,2,2,2], channels=[64,128,256,512])
resnet34_config = ResNetConfig(block=BasicBlock, n_blocks=[3,4,6,3], channels=[64,128,256,512])

In [None]:
# Resnet50, 101, 152 model에는 BottleNeck block을 사용한다
# block : 모델에서 사용하는 블록의 종류 (BasicBlock이냐 BottleNeck Block이냐)
# n_blocks : 모델에서 Residual Block을 입력된 개수만큼 짝지어서 하나의 layer로 구현할 예정이다.
# channels : 각 layer를 빠져나올 때 출력되는 최종 channels

resnet50_config = ResNetConfig(block=Bottleneck, n_blocks=[3,4,6,3], channels=[64,128,256,512])
resnet101_config = ResNetConfig(block=Bottleneck, n_blocks=[3,4,23,3], channels=[64,128,256,512])
resnet152_config = ResNetConfig(block=Bottleneck, n_blocks=[3,8,36,3], channels=[64,128,256,512])

In [None]:
# 각 종류별 모델을 생성하는 함수를 제작한다
output_dim = 10 # CIFAR10 Dataset의 클래스는 총 10개이다
def ResNet18():
    return ResNet(resnet18_config, output_dim)

def ResNet34():
    return ResNet(resnet34_config, output_dim)

def ResNet50():
    return ResNet(resnet50_config, output_dim)

def ResNet101():
    return ResNet(resnet101_config, output_dim)

def ResNet152():
    return ResNet(resnet152_config, output_dim)

In [None]:
# 모델 정의가 완료되었다
# 학습 도중 Learning Rate를 지속적으로 조절하기 위한 Scheduler를 제작한다
# 50 에폭 이상일 때는 Default learning rate의 10%
# 100 에폭 이상일 때는 Dafault learning rate의 1%로 조절한다
def lr_scheduler(optimizer, epoch):
  lr = learning_rate
  if epoch >= 50:
    lr /= 10
  if epoch >= 100:
    lr /= 10
  for param_group in optimizer.param_groups:
    param_group['lr'] = lr

# Dense Layer의 경우에는 가중치를 Xavier초기값으로 설정
# 만약 전달된 객체가 nn.Linear를 상속한 계층이라면, 즉 Fully Connected Layer라면
# 해당 모듈의 가중치를 xavier가중치로 초기화한다
# 그리고 bias를 0.01로 초기화한다
def init_weights(m):
  if isinstance(m, nn.Linear):
    torch.nn.init.xavier_uniform(m.weight)
    m.bias.data.fill_(0.01)

In [None]:
# Dataset을 Augmentation 및 Preprocess하기 위한 전처리기를 제작
transform_train = transforms.Compose([
  transforms.RandomCrop(32, padding=4), # 32 by 32사이즈로 Loading하고 각 border에 4의 패딩을 붙여 40 by 40 size를 나오게 한다
  transforms.RandomHorizontalFlip(),    # 불러오는 데이터들을 무작위로 수평반전
  transforms.ToTensor()                 # pixel값들을 정규화하고 Tensor자료형으로 변환시킨다
])

transform_test = transforms.Compose([
  transforms.ToTensor()
])

# 모델의 Train 및 Test로 사용할 CIFAR10 Dataset를 불러온다
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=8)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=8)


Files already downloaded and verified
Files already downloaded and verified


  cpuset_checked))


In [None]:
# Device 및 model준비하기
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet50()
# 모델의 Weight Initiallization에 앞서 정의한 함수를 이용한다
model.apply(init_weights)
model = model.to(device)



In [None]:
# Train을 위한 각종 변수 초기화하기
learning_rate = 0.1
num_epoch = 150
model_name = "model.pth"

loss_fn = nn.CrossEntropyLoss()
# 가중치를 0.0001이하로 떨어지지 않도록 다음과 같이 제한한다
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=0.0001)



In [None]:
# Train 시작
best_acc = 0

for epoch in range(num_epoch):
  print("=====","{} epoch of {}".format(epoch+1, num_epoch), "=====")
  model.train()
  lr_scheduler(optimizer, epoch)
  train_loss = 0
  valid_loss = 0
  correct = 0
  total_cnt = 0

  for step, (x,y) in enumerate(train_loader):
    x, y = x.to(device), y.to(device)
    optimizer.zero_grad()

    preds = model(x)
    loss = loss_fn(preds, y)
    loss.backward()

    optimizer.step()
    train_loss += loss.item()
    predict = preds.max(1)[1]

    total_cnt += y.size(0)
    correct += predict.eq(y).sum().item()
    
    if step % 100 == 0 and step != 0:
        print(f"\n====== { step } Step of { len(train_loader) } ======")
        print(f"Train Acc : { correct / total_cnt }")
        print(f"Train Loss : { loss.item() / y.size(0) }")

  correct = 0
  total_cnt = 0
  

  # Test Data로 Epoch마다 성능평가하기
  
  with torch.no_grad():
    model.eval()
    for step, (x,y) in enumerate(test_loader):
        # input and target
        x, y = x.to(device), y.to(device)
        total_cnt += y.size(0)
        preds = model(x)
        valid_loss += loss_fn(preds, y)
        predict = preds.max(1)[1]
        correct += predict.eq(y).sum().item()
    valid_acc = correct / total_cnt
    print(f"\nValid Acc : { valid_acc }")    
    print(f"Valid Loss : { valid_loss / total_cnt }")

    if(valid_acc > best_acc):
        best_acc = valid_acc
        torch.save(model, model_name)
        print("Model Saved!")

===== 1 epoch of 150 =====


  cpuset_checked))



Train Acc : 0.37488397277227725
Train Loss : 0.006810068618506193

Valid Acc : 0.3728
Valid Loss : 0.007474941201508045
Model Saved!
===== 2 epoch of 150 =====

Train Acc : 0.3954981435643564
Train Loss : 0.006263743620365858

Valid Acc : 0.426
Valid Loss : 0.0067781987600028515
Model Saved!
===== 3 epoch of 150 =====

Train Acc : 0.4095374381188119
Train Loss : 0.006532766856253147

Valid Acc : 0.4519
Valid Loss : 0.006374410819262266
Model Saved!
===== 4 epoch of 150 =====

Train Acc : 0.43019028465346537
Train Loss : 0.006118895020335913

Valid Acc : 0.4629
Valid Loss : 0.006350953597575426
Model Saved!
===== 5 epoch of 150 =====

Train Acc : 0.43742264851485146
Train Loss : 0.0055789886973798275

Valid Acc : 0.4762
Valid Loss : 0.006258875597268343
Model Saved!
===== 6 epoch of 150 =====

Train Acc : 0.44523514851485146
Train Loss : 0.0058228811249136925

Valid Acc : 0.4731
Valid Loss : 0.006167639512568712
===== 7 epoch of 150 =====

Train Acc : 0.4544399752475248
Train Loss : 0.

KeyboardInterrupt: ignored