## ResNet 

In [20]:
!pip install git+https://github.com/deepvision-class/starter-code

Collecting git+https://github.com/deepvision-class/starter-code
  Cloning https://github.com/deepvision-class/starter-code to /tmp/pip-req-build-0_cevplb
  Running command git clone -q https://github.com/deepvision-class/starter-code /tmp/pip-req-build-0_cevplb
Building wheels for collected packages: Colab-Utils
  Building wheel for Colab-Utils (setup.py) ... [?25l[?25hdone
  Created wheel for Colab-Utils: filename=Colab_Utils-0.1.dev0-cp36-none-any.whl size=10324 sha256=db26751e6dc17367109fda586f505b9e2bb8dbdc6d397ced98cd637992a1ed7e
  Stored in directory: /tmp/pip-ephem-wheel-cache-sn9pb3_v/wheels/63/d1/27/a208931527abb98d326d00209f46c80c9d745851d6a1defd10
Successfully built Colab-Utils
Installing collected packages: Colab-Utils
Successfully installed Colab-Utils-0.1.dev0


In [21]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import coutils
from coutils import fix_random_seed

import torchvision.datasets as dset
import torchvision.transforms as T

In [5]:
dtype = torch.float
ltype = torch.long

if torch.cuda.is_available():
  device = torch.device('cuda:0')
else:
  device = torch.device('cpu')

# Constant to control how frequently we print train loss
print_every = 100

print('using device:', device)

using device: cuda:0


### Load Data

In [4]:
NUM_TRAIN = 49000
transform = T.Compose([
                T.ToTensor(),
                T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
            ])

cifar10_train = dset.CIFAR10('./datasets', train=True, download=True,
                             transform=transform)
loader_train = DataLoader(cifar10_train, batch_size=64, 
                          sampler=sampler.SubsetRandomSampler(range(NUM_TRAIN)))

cifar10_val = dset.CIFAR10('./datasets', train=True, download=True,
                           transform=transform)
loader_val = DataLoader(cifar10_val, batch_size=64, 
                        sampler=sampler.SubsetRandomSampler(range(NUM_TRAIN, 50000)))

cifar10_test = dset.CIFAR10('./datasets', train=False, download=True, 
                            transform=transform)
loader_test = DataLoader(cifar10_test, batch_size=64)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./datasets/cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Extracting ./datasets/cifar-10-python.tar.gz to ./datasets
Files already downloaded and verified
Files already downloaded and verified


In [24]:
def check_accuracy(loader, model):
  if loader.dataset.train:
    print('Checking accuracy on validation set')
  else:
    print('Checking accuracy on test set')   
  num_correct = 0
  num_samples = 0
  model.eval()  # set model to evaluation mode
  with torch.no_grad():
    for x, y in loader:
      x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
      y = y.to(device=device, dtype=ltype)
      scores = model(x)
      _, preds = scores.max(dim=1)
      num_correct += (preds == y).sum()
      num_samples += preds.size(0)
    acc = float(num_correct) / num_samples
    print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
  return acc

In [25]:
def adjust_learning_rate(optimizer, lrd, epoch, schedule):
  """
  Multiply lrd to the learning rate if epoch is in schedule
  
  Inputs:
  - optimizer: An Optimizer object we will use to train the model
  - lrd: learning rate decay; a factor multiplied at scheduled epochs
  - epochs: the current epoch number
  - schedule: the list of epochs that requires learning rate update
  
  Returns: Nothing, but learning rate might be updated
  """
  if epoch in schedule:
    for param_group in optimizer.param_groups:
      print('lr decay from {} to {}'.format(param_group['lr'], param_group['lr'] * lrd))
      param_group['lr'] *= lrd




def train(model, optimizer, epochs=1, learning_rate_decay=.1, schedule=[]
                                                                  , verbose=True):
  model = model.to(device=device)  # move the model parameters to CPU/GPU
  num_iters = epochs * len(loader_train)
  if verbose:
    num_prints = num_iters // print_every + 1
  else:
    num_prints = epochs
  acc_history = torch.zeros(num_prints, dtype=torch.float)
  iter_history = torch.zeros(num_prints, dtype=torch.long)
  # for each epoch
  for e in range(epochs):
    
    adjust_learning_rate(optimizer, learning_rate_decay, e, schedule)
    
    # for every minibatch
    for t, (x, y) in enumerate(loader_train):
      model.train()  # put model to training mode
      x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
      y = y.to(device=device, dtype=ltype)

      scores = model(x)
      loss = F.cross_entropy(scores, y)

      # Zero out all of the gradients for the variables which the optimizer
      # will update.
      optimizer.zero_grad()

      # This is the backwards pass: compute the gradient of the loss with
      # respect to each  parameter of the model.
      loss.backward()

      # Actually update the parameters of the model using the gradients
      # computed by the backwards pass.
      optimizer.step()

      # 화면 출력
      tt = t + e * len(loader_train)

      if verbose and (tt % print_every == 0 or (e == epochs-1 and t == len(loader_train)-1)):
        print('Epoch %d, Iteration %d, loss = %.4f' % (e, tt, loss.item()))
        acc = check_accuracy(loader_val, model)
        acc_history[tt // print_every] = acc
        iter_history[tt // print_every] = tt
        print()
      elif not verbose and (t == len(loader_train)-1):
        print('Epoch %d, Iteration %d, loss = %.4f' % (e, tt, loss.item()))
        acc = check_accuracy(loader_val, model)
        acc_history[e] = acc
        iter_history[e] = tt
        print()
  return acc_history, iter_history

### Plain block
residual connection이 없는 plain block을 먼저 구현해봅니다! 
PreResNet은 Conv 레이어 전에 BatchNorm과 ReLU를 먼저 끼워넣는 순서입니다. 그래서 PreResNet이름은 pre-activation architecture에서 온 것입니다. downsampling을 위해서 maapool 레이어를 넣는 대신 block의 첫번째 Conv 레이어에서 stride를 2로 설정합니다.

plain block은 shape가 $C_{in} \times H_{in} \times W_{out}$ 인 feature map을 받아서 shape가 $C_{out} \times H_{out} \times W_{out}$인 feature map을 생성합니다. block이 downsampling을 수행할 경우 $W_{out}=W_{in}/2$, $H_{out}=H_{in}/2$ 가 되고, 그렇지 않을 경우 $H_{out}=H_{in}$, $W_{out}=W_{in}$ 일 것입니다. 

* plain block의 레이어 구성
1. Spatial Batch normalization
2. ReLU
3. Conv layer with Cout 3x3 filters, zero-padding of 1, stride 1 (downsampling할 경우 stride 2)
4. Spatial Batch normalization
5. ReLU
6. Conv layer with Cout 3x3 filters, zero-padding of 1, stride 1 (downsampling할 경우 stride 2)


In [7]:
class PlainBlock(nn.Module):
  def __init__(self, Cin, Cout, downsample=False):
    super().__init__()

    self.net = None
    self.net = nn.Sequential(nn.BatchNorm2d(Cin),
                          nn.ReLU(),
                          nn.Conv2d(Cin, Cout, 3, padding=1, stride=2 if downsample else 1),
                          nn.BatchNorm2d(Cout),
                          nn.ReLU(),
                          nn.Conv2d(Cout, Cout, 3, padding=1))  
  def forward(self, x):
    return self.net(x)
    

In [8]:
### dimension check code
data = torch.zeros(2, 3, 5, 6)
model = PlainBlock(3, 10)
if list(model(data).shape) == [2, 10, 5, 6]:
  print('The output of PlainBlock without downsampling has a *correct* dimension!')
else:
  print('The output of PlainBlock without downsampling has an *incorrect* dimension! expected:', [2, 10, 5, 6], 'got:', list(model(data).shape))

data = torch.zeros(2, 3, 5, 6)
model = PlainBlock(3, 10, downsample=True)
if list(model(data).shape) == [2, 10, 3, 3]:
  print('The output of PlainBlock with downsampling has a *correct* dimension!')
else:
  print('The output of PlainBlock with downsampling has an *incorrect* dimension! expected:', [2, 10, 3, 3], 'got:', list(model(data).shape))

The output of PlainBlock without downsampling has a *correct* dimension!
The output of PlainBlock with downsampling has a *correct* dimension!


### Residual Block
Residual block은 plain block에서 residual connection을 더한 것이라고 발표에서 말씀드렸죠? $\mathcal{F}$ 가 plain block이고 $\mathcal{H}$ 가 plain block의 residual version이라고 하면 $\mathcal{F}$ 는

$\mathcal{H}(x) = \mathcal{F}(x) + x$

입니다.

하지만 이 식은 plain block $\mathcal{F}(x)$ 의 output이 input $x$ 와 동일한 shape일 경우입니다. 위의 plain block 구현에서 알 수 있듯, output이 input과 shape이 달라지는 경우를 두 가지로 생각해 볼 수 있습니다.
  1. output channel수 $C_{out}$ 가 input channel수 $C_{in}$과 다름
  2. plain block $\mathcal{F}$ 가 spatial downsampling을 수행했을 때

따라서 두 가지 케이스를 일반화하기 위해 shortcut connection $\mathcal{G}$ 를 추가하겠습니다.

$\mathcal{R}(x) = \mathcal{F}(x) + \mathcal{G}(x)$

그럼 $\mathcal{G}$ 는 세 가지의 경우가 있습니다.

1. $C_{in}=C_{out}$ 일 때

    : $\mathcal{F}$가 downsampling을 하지 않을 것이므로 $\mathcal{F}(x)$ 와 $x$의 shape이 같을 것

    :  $\mathcal{G}$ 는 Identity function:   $\mathcal{G}(x) = x$

2. $C_{in} \neq C_{out}$ && $\mathcal{F}$ 가 downsampling 하지 않았을 때

    : $\mathcal{G}$ 는 1X1 conv, $C_{out}$ filters, stride 1

3. $\mathcal{F}$ 가 downsampling

  : $\mathcal{G}$ 는 1X1 conv, $C_{out}$ filters, stride 2


In [9]:
class ResidualBlock(nn.Module):
  def __init__(self, Cin, Cout, downsample=False):
    super().__init__()

    self.block = None # F
    self.shortcut = None # G
    self.block = PlainBlock(Cin, Cout, downsample)
    if downsample:
      self.shortcut = nn.Conv2d(Cin, Cout, 1, padding = 0, stride = 2)
    else:
      if Cin == Cout:
        self.shortcut = nn.Identity()
      else:
        self.shortcut = nn.Conv2d(Cin, Cout, 1, padding = 0)
  
  def forward(self, x):
    return self.block(x) + self.shortcut(x)

In [10]:
data = torch.zeros(2, 3, 5, 6)
model = ResidualBlock(3, 10)
if list(model(data).shape) == [2, 10, 5, 6]:
  print('The output of ResidualBlock without downsampling has a *correct* dimension!')
else:
  print('The output of ResidualBlock without downsampling has an *incorrect* dimension! expected:', [2, 10, 5, 6], 'got:', list(model(data).shape))

data = torch.zeros(2, 3, 5, 6)
model = ResidualBlock(3, 10, downsample=True)
if list(model(data).shape) == [2, 10, 3, 3]:
  print('The output of ResidualBlock with downsampling has a *correct* dimension!')
else:
  print('The output of ResidualBlock with downsampling has an *incorrect* dimension! expected:', [2, 10, 3, 3], 'got:', list(model(data).shape))

The output of ResidualBlock without downsampling has a *correct* dimension!
The output of ResidualBlock with downsampling has a *correct* dimension!


### Residual stage

In [11]:
class ResNetStage(nn.Module):
  def __init__(self, Cin, Cout, num_blocks, downsample=True,
               block=ResidualBlock):
    super().__init__()
    blocks = [block(Cin, Cout, downsample)] # 첫번째 block에서 한번 다운샘플 하고
    for _ in range(num_blocks - 1):         # num_block - 1 개만큼 쌓음
      blocks.append(block(Cout, Cout))
    self.net = nn.Sequential(*blocks)       
      # *blocks 앞의 별표(*)는 리스트를 unpacking하는 역할 
      # 참조: https://mingrammer.com/understanding-the-asterisk-of-python/
  def forward(self, x):
    return self.net(x)

In [12]:
### 이제 만든 plain block과 residual block들을 stage로 만들어서 뽑아봅시다!
print('Plain block stage:')
print(ResNetStage(3, 4, 2, block=PlainBlock))
print('Residual block stage:')
print(ResNetStage(3, 4, 2, block=ResidualBlock))

Plain block stage:
ResNetStage(
  (net): Sequential(
    (0): PlainBlock(
      (net): Sequential(
        (0): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU()
        (2): Conv2d(3, 4, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        (3): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU()
        (5): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
    )
    (1): PlainBlock(
      (net): Sequential(
        (0): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU()
        (2): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (3): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU()
        (5): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      )
    )
  )
)
Residual block stage:
ResNetStage(
  (net): Sequential(
    (0): 

### Residual stem
stem layer는 네트워크가 시작할때 채널 수는 증가시키고 다른 dimension들은 유지할 때 수행합니다.

In [13]:
class ResNetStem(nn.Module):
  def __init__(self, Cin=3, Cout=8):
    super().__init__()
    layers = [
        nn.Conv2d(Cin, Cout, kernel_size=3, padding=1, stride=1),
        nn.ReLU(),
    ]
    self.net = nn.Sequential(*layers)
    
  def forward(self, x):
    return self.net(x)

In [14]:
# dimention check
data = torch.zeros(2, 3, 5, 6)
model = ResNetStem(3, 10)
if list(model(data).shape) == [2, 10, 5, 6]:
  print('The output of ResidualBlock without downsampling has a *correct* dimension!')
else:
  print('The output of ResidualBlock without downsampling has an *incorrect* dimension! expected:', [2, 10, 5, 6], 'got:', list(model(data).shape))

The output of ResidualBlock without downsampling has a *correct* dimension!


### ResNet class
이제 block들을 이용해서 resnet class를 만듭니다!

networks : 미리 stage를 구체화해논 것! get_resnet(key) 하면 key에 해당하는 모델을 반환합니다. 예를 들어 get_resnet('resnet32') 를 하면 32 layer의 ResNet을 반환합니다.

'stahe_args' key에 해당하는 value는 tuple의 형태로, 각 tuple은 (num_in_channels, num_out_channels, num_blocks, whether_do_downsample) 입니다.

input size의 영향을 받지 않기 위해 conv 부분의 끝에 average pooling을 적용합니다. 따라서 linear layer에 들어가는 input sizze는 항상 (batch_size, stage_arg[-1][1]) 입니다.

In [15]:
# network example
networks = {
  'plain32': {
    'block': PlainBlock,
    'stage_args': [
      (8, 8, 5, False),
      (8, 16, 5, True),
      (16, 32, 5, True),
    ]
  },
  'resnet32': {
    'block': ResidualBlock,
    'stage_args': [
      (8, 8, 5, False),
      (8, 16, 5, True),
      (16, 32, 5, True),
    ]
  },
}

In [16]:
class ResNet(nn.Module):
  def __init__(self, stage_args, Cin=3, block=ResidualBlock, num_classes=10):
    super().__init__()

    self.cnn = None
    # ResNetStem, ResNetStage을 이용해서 conv 파트 구현하고 nn.Sequential 모듈로 감싸기
    # 모델을 self.cnn에 저장                                             
    layers = []
    layers.append(ResNetStem(Cin, stage_args[0][0]))
    for i in range (len(stage_args)):
      layers.append(ResNetStage(*stage_args[i])) 
    self.cnn = nn.Sequential(*layers)
    self.fc = nn.Linear(stage_args[-1][1], num_classes)
  
  # ResNet의 forward function 구현
  # scores에 output 저장
  def forward(self, x):
    scores = None
    x = self.cnn(x)
    x = nn.AvgPool2d(x.shape[2])(x)
    x = x.flatten(start_dim=1, end_dim=-1)
    scores = self.fc(x)
    return scores

def get_resnet(name):
  return ResNet(**networks[name])
# **networks[name] 앞의 쌍별표(**)는 튜플을 unpacking하는 역할 
# 참조: https://mingrammer.com/understanding-the-asterisk-of-python/    

### Training the model

In [None]:
names = ['plain32', 'resnet32']
acc_history_dict = {}
iter_history_dict = {}
for name in names:
  fix_random_seed(0)
  print(name, '\n')
  model = get_resnet(name)
#   init_module(model)
  
  optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=.9, weight_decay=1e-4)

  acc_history, iter_history = train(model, optimizer, epochs=10, schedule=[6, 8], verbose=False)
  acc_history_dict[name] = acc_history
  iter_history_dict[name] = iter_history

plain32 

Epoch 0, Iteration 765, loss = 1.1885
Checking accuracy on validation set
Got 574 / 1000 correct (57.40)

Epoch 1, Iteration 1531, loss = 1.1080
Checking accuracy on validation set
Got 548 / 1000 correct (54.80)

Epoch 2, Iteration 2297, loss = 1.0254
Checking accuracy on validation set
Got 658 / 1000 correct (65.80)

Epoch 3, Iteration 3063, loss = 0.4737
Checking accuracy on validation set
Got 696 / 1000 correct (69.60)

Epoch 4, Iteration 3829, loss = 0.5694
Checking accuracy on validation set
Got 721 / 1000 correct (72.10)

Epoch 5, Iteration 4595, loss = 0.8150
Checking accuracy on validation set
Got 743 / 1000 correct (74.30)

lr decay from 0.01 to 0.001
Epoch 6, Iteration 5361, loss = 0.6112
Checking accuracy on validation set
Got 799 / 1000 correct (79.90)

Epoch 7, Iteration 6127, loss = 0.4992
Checking accuracy on validation set
Got 806 / 1000 correct (80.60)

lr decay from 0.001 to 0.0001
Epoch 8, Iteration 6893, loss = 0.5738
Checking accuracy on validation set
Go

### Residual bottleneck block
bottleneck block은 연산량을 줄이면서 input을 더 많이 가공할 수 있기 때문에(더 깊은 레이어를 거침) 효율적입니다!

bottleneck block의 레이어는 다음과 같습니다.

1. Spatial Batch normalization
2. ReLU
3. Convolutional layer with `Cout // 4` 1x1 filters, stride 2 if downsampling; otherwise stride 1
4. Spatial Batch normalization
5. ReLU
6. Convolutional layer with `Cout // 4` 3x3 filters, with zero-padding of 1
7. Spatial Batch normalization
8. ReLU
9. Convolutional layer with `Cout` 1x1 filters



In [30]:
class ResidualBottleneckBlock(nn.Module):
  def __init__(self, Cin, Cout, downsample=False):
    super().__init__()

    self.block = None
    self.shortcut = None

    self.block = nn.Sequential(nn.BatchNorm2d(Cin),
                               nn.ReLU(),
                               nn.Conv2d(Cin, Cout//4, 1, stride=2 if downsample else 1),
                               nn.BatchNorm2d(Cout//4),
                               nn.ReLU(),
                               nn.Conv2d(Cout//4, Cout//4, 3, padding=1),
                               nn.BatchNorm2d(Cout//4),
                               nn.ReLU(),
                               nn.Conv2d(Cout//4, Cout, 1))
    if downsample:
      self.shortcut = nn.Conv2d(Cin, Cout, 1, padding = 0, stride = 2)
    else:
      if Cin == Cout:
        self.shortcut = nn.Identity()
      else:
        self.shortcut = nn.Conv2d(Cin, Cout, 1, padding = 0)
    


  def forward(self, x):
    return self.block(x) + self.shortcut(x)

In [31]:
# dimension check
data = torch.zeros(2, 3, 5, 6)
model = ResidualBottleneckBlock(3, 10)
if list(model(data).shape) == [2, 10, 5, 6]:
  print('The output of ResidualBlock without downsampling has a *correct* dimension!')
else:
  print('The output of ResidualBlock without downsampling has an *incorrect* dimension! expected:', [2, 10, 5, 6], 'got:', list(model(data).shape))

data = torch.zeros(2, 3, 5, 6)
model = ResidualBottleneckBlock(3, 10, downsample=True)
if list(model(data).shape) == [2, 10, 3, 3]:
  print('The output of ResidualBlock with downsampling has a *correct* dimension!')
else:
  print('The output of ResidualBlock with downsampling has an *incorrect* dimension! expected:', [2, 10, 3, 3], 'got:', list(model(data).shape))

The output of ResidualBlock without downsampling has a *correct* dimension!
The output of ResidualBlock with downsampling has a *correct* dimension!
