# Writing `ResNet` from Scratch in PyTorch



이 강좌는 [이 블로그](https://blog.paperspace.com/writing-resnet-from-scratch-in-pytorch/)의 내용을 번역하고 부분적으로 수정한 것이다.

Resnet은 Residual Connections의 개념을 도입하여 네트워크가 너무 깊을 경우 네트워크 성능이 저하되는 문제를 해결한 Computer Vision의 주요 혁신중 하나이다.
우선 ResNet의 아키텍처와 그 뒤에 숨어있는 직관적인 아이디어를 살펴보는 것으로 시작한다.



## Residual Block과 `Resnet`

Resnet의 핵심 구성요소는 resudual block이다. 아래 그림이 하나의 residual block을 보여준다.



<img src="https://raw.githubusercontent.com/ohheum/DS2022/7ef336792c0a4ae3044653a7e020818354656b55/assets/image-9.png" width="600" height="360">





위의 그림에서 보이듯이 순차적인 연결 외에 모델의 일부 계층을 건너뛰는 연결(**skipped connection**)이 있다. 건너뛰기 연결이 없을 때 이 모델이 학습해야할 매핑을 `H(x)`라고 한다면 건너뛰기 연결을 추가할 경우 순차 연결 부분이 학습해야할 매핑은 `F(x) = H(x) - x`가 될 것이다. 이 잔여(residual) 매핑  `F(x)`를 학습하는 것이 원래의 매핑 `H(x)`를 학습하는 것보다 쉽다. (극단적인 예로 `H(x)`가 identity mapping이라면 residual mapping F(x)는 zero mapping이다. identity mapping보다는 zero mapping이 학습하기 쉬울 것이다.)

아래는 34 레이어 ResNet의 아키텍처입니다.

<img src="https://raw.githubusercontent.com/ohheum/DS2022/7ef336792c0a4ae3044653a7e020818354656b55/assets/image-10.png" width="800" height="1800">

## Dataset

CIFAR-10 데이터 세트를 사용한다. CIFAR-10 데이터 세트는 10개 클래스의 60000개의 32x32 컬러 이미지로 구성되며 클래스당 6000개의 이미지가 있다. 50000개의 훈련 이미지와 10000개의 테스트 이미지가 있다.

데이터 세트는 각각 10000개의 이미지가 있는 5개의 훈련 배치와 1개의 테스트 배치로 나뉜다. 테스트 배치에는 각 클래스에서 무작위로 선택된 정확히 1000개의 이미지가 포함된다. 학습 배치에는 나머지 이미지가 무작위 순서로 포함되지만 일부 학습 배치에는 한 클래스의 이미지가 다른 클래스보다 더 많을 수 있다. 

<img src="https://pytorch.org/tutorials/_images/cifar10.png" width="600" height="420">

## Importing the Libraries

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


## Loading the Dataset

토치비전 라이브러리를 사용하여 데이터 세트를 로드한다.

* 아래의의 `data_loader` 함수는 트레이닝 데이터와 테스트 데이터에 대한 `DataLoader`를 생성하여 반환한다. 
* 데이터 세트의 각 채널(빨간색, 녹색 및 파란색)의 평균 및 표준 편차로 정규화를 수행한다.
* 데이터 로더를 사용하면 데이터를 일괄적으로 반복할 수 있으며 데이터는 반복하는 동안 로드되며 시작 시 한 번에 모두 RAM에 로드되지는 않는다. 이것은 대규모 데이터 세트를 처리하는 경우 매우 유용하다.
* 매개변수 `test=False`인 경우 트레이닝 분할을, `test=True`인 경우 테스트 분할을 로드한다. train의 경우 분할은 무작위로 train과 validation set(0.9:0.1)으로 나뉜다.

In [3]:
def data_loader(data_dir,
                batch_size,
                random_seed=42,
                valid_size=0.1,
                shuffle=True,
                test=False):
  
    normalize = transforms.Normalize( # empirical parameter, probably mean and stddev of whole dataset
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010],
    )

    # define transforms
    transform = transforms.Compose([ # function composition
            transforms.Resize((224,224)),
            transforms.ToTensor(),
            normalize,
    ])

    if test:
        dataset = datasets.CIFAR10(
          root=data_dir, train=False,
          download=True, transform=transform,
        )

        data_loader = torch.utils.data.DataLoader(
            dataset, batch_size=batch_size, shuffle=shuffle
        )

        return data_loader

    # load the dataset
    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    num_train = len(train_dataset)
    indices = list(range(num_train)) # list of [0, ..., num_train-1]
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(42)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler)
 
    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)


# CIFAR10 dataset 
train_loader, valid_loader = data_loader(data_dir='./data',
                                         batch_size=64)

test_loader = data_loader(data_dir='./data',
                              batch_size=64,
                              test=True)

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


## How models work in PyTorch

 ResNet에는 PyTorch가 제공하는 다양한 유형의 레이어가 사용된다.

* nn.Conv2d: 커널 크기와 함께 입력 및 출력 채널의 수를 인수로 받아들이는 컨볼루션 계층. 
* nn.BatchNorm2d: 컨볼루션 레이어의 출력에 일괄 정규화를 적용, 각 레이어 정규화 하는데 그 정규화 파라메터도 최적화
* nn.ReLU: 네트워크의 다양한 출력에 적용되는 활성화 함수
* nn.MaxPool2d : 최대 풀링을 적용
* nn.Dropout: 주어진 확률로 출력에 드롭아웃을 적용 (prob 을 줘서 레이어 각 아웃풋을 prob확률로 0으로 강제), overfit 방지
* nn.Linear: 완전히 연결된 레이어
* nn.Sequential: 여러 계층을 결합하는데 사용됨

## Residual Block

먼저 네트워크 전체에서 재사용할 수 있는 ResidualBlock을 정의한다. 

In [4]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
                        nn.BatchNorm2d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample: # stride에 따라 residual과 x의 사이즈 일치시키기 위한 operation
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

## Resnet

이제 ResidualBlock을 만들었으므로 ResNet을 빌드할 수 있다.

아키텍처에는 각각 3, 3, 6, 3개의 레이어를 포함하는 3개의 블록이 있다. 이 블록을 만들기 위해 도우미 함수 _make_layer를 만든다. 이 기능은 Residual Block과 함께 레이어를 하나씩 추가한다. 블록 다음에 평균 풀링과 최종 선형 레이어를 추가한다.

In [5]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 10):
        super(ResNet, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Sequential(
                        nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512, num_classes)
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    
    
    def forward(self, x):     # x.shape=(3,224,224)
        x = self.conv1(x)     # (112, 112)
        x = self.maxpool(x)   # (56, 56)
        x = self.layer0(x)    # (56, 56)
        x = self.layer1(x)    # (28, 28)
        x = self.layer2(x)    # (14, 14)
        x = self.layer3(x)    # (7, 7)

        x = self.avgpool(x)   # (1,1)
        x = x.view(x.size(0), -1)   # 512 # flatten
        x = self.fc(x)        

        return x

## Setting Hyperparameters

In [6]:
num_classes = 10
num_epochs = 20
batch_size = 16
learning_rate = 0.01

model = ResNet(ResidualBlock, [3, 4, 6, 3]).to(device)

from torchsummary import summary
print(summary(model, (3, 224, 224)))

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.001, momentum = 0.9)  

# Train the model
total_step = len(train_loader)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,472
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]          36,928
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,928
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
    ResidualBlock-11           [-1, 64, 56, 56]               0
           Conv2d-12           [-1, 64, 56, 56]          36,928
      BatchNorm2d-13           [-1, 64, 56, 56]             128
             ReLU-14           [-1, 64,

## Training

In [None]:
import gc
total_step = len(train_loader)

for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        del images, labels, outputs
        torch.cuda.empty_cache()
        gc.collect()

    print ('Epoch [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, loss.item()))
            
    # Validation
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
    
        print('Accuracy of the network on the {} validation images: {} %'.format(5000, 100 * correct / total)) 

Epoch [1/20], Loss: 1.2589
Accuracy of the network on the 5000 validation images: 56.68 %
Epoch [2/20], Loss: 0.4178
Accuracy of the network on the 5000 validation images: 73.86 %
Epoch [3/20], Loss: 0.4106
Accuracy of the network on the 5000 validation images: 78.14 %
Epoch [4/20], Loss: 0.2229
Accuracy of the network on the 5000 validation images: 79.06 %
Epoch [5/20], Loss: 1.0332
Accuracy of the network on the 5000 validation images: 81.64 %


코드의 출력을 보면 매 에포크마다 검증(validation) 세트의 정확도가 증가하고 손실이 감소함에 따라 모델이 학습하고 있음을 알 수 있다. 

## Testing

테스트를 위해 유효성 검사와 정확히 동일한 코드를 사용하지만 test_loader를 사용한다.

In [None]:
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    print('Accuracy of the network on the {} test images: {} %'.format(10000, 100 * correct / total))   

Accuracy of the network on the 10000 test images: 83.93 %



위의 코드를 사용하고 10개의 에포크 동안 모델을 훈련하여 테스트 세트에서 82.87%의 정확도를 달성할 수 있었다.