In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from tqdm import trange
from torch.utils.data import DataLoader
from torchvision.datasets import CIFAR10

In [2]:
cifar_data = CIFAR10('data/cifar10',
                     train=True,
                     download=True,
                     transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Resize((224, 224)),
                        transforms.Normalize(
                            mean=[0.485, 0.456, 0.406], # These are RGB mean+std values
                            std=[0.229, 0.224, 0.225])  # across a large photo dataset.
                     ]))
data_loader = DataLoader(cifar_data,
                         batch_size=8,
                         shuffle=True,
                         num_workers=0)

Files already downloaded and verified


In [3]:
def build_resnet18(n_classes=10):
    return ResNet(BasicBlock, block_num=[2, 2, 2, 2], n_classes=n_classes)

def build_resnet18v2(n_classes=10):
    return ResNet(BasicBlockV2, block_num=[2, 2, 2, 2], n_classes=n_classes)

def build_resnet34(n_classes=10):
    return ResNet(BasicBlock, block_num=[3, 4, 6, 3], n_classes=n_classes)

def build_resnet34v2(n_classes=10):
    return ResNet(BasicBlockV2, block_num=[3, 4, 6, 3], n_classes=n_classes)

def build_resnet50(n_classes=10):
    return ResNet(BottleneckBlock, block_num=[3, 4, 6, 3], n_classes=n_classes)

def build_resnet101(n_classes=10):
    return ResNet(BottleneckBlock, block_num=[3, 4, 23, 3], n_classes=n_classes)

def build_resnet152(n_classes=10):
    return ResNet(BottleneckBlock, block_num=[3, 8, 36, 3], n_classes=n_classes)

In [4]:
class BottleneckBlock(nn.Module):
    # expansion of output
    expansion = 4

    def __init__(self, in_channel, out_channel, stride=1):
        super().__init__()
        
        self.diff_size = stride != 1 or in_channel != out_channel * self.expansion
        # when connection between difference input and output size
        # use linear projection, in this case it is 1x1 convolution
        if self.diff_size:
            self.ws = nn.Sequential(
                nn.Conv2d(in_channel, out_channel * self.expansion, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channel * self.expansion),
            )
        
        # building block
        self.conv = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=stride, bias=False),
            nn.ReLU(),
            nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, stride=1, bias=False),
            nn.ReLU(),
            nn.Conv2d(out_channel, out_channel * self.expansion, kernel_size=1, stride=1, bias=False),
        )
        
        self.relu = nn.ReLU()
        
    def forward(self, X):
        # F(X)
        out = self.conv(X)
        if self.diff_size:
            X = self.ws(X)

        # F + x
        y = out + X
        return self.relu(y)

In [5]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channel, out_channel, stride=1):
        super().__init__()
        
        self.diff_size = stride != 1 or in_channel != out_channel
        # when connection between difference input and output size
        # use linear projection, in this case it is 1x1 convolution
        if self.diff_size:
            self.ws = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channel),
            )
        
        # building block
        self.conv = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(),
            nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channel),
        )
        self.relu = nn.ReLU()
        
    def forward(self, X):
        # F(X)
        out = self.conv(X)
        if self.diff_size:
            X = self.ws(X)

        # F + x
        y = out + X
        return self.relu(y)            

In [6]:
# using pre-activation in https://arxiv.org/abs/1603.05027
class BasicBlockV2(nn.Module):
    expansion = 1

    def __init__(self, in_channel, out_channel, stride=1):
        super().__init__()
        
        self.diff_size = stride != 1 or in_channel != out_channel
        # when connection between difference input and output size
        # use linear projection, in this case it is 1x1 convolution
        if self.diff_size:
            self.ws = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channel),
            )
        
        # building block
        self.conv = nn.Sequential(
            nn.BatchNorm2d(in_channel),
            nn.ReLU(),
            nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channel),
            nn.ReLU(),
            nn.Conv2d(out_channel, out_channel, kernel_size=3, padding=1, bias=False),
        )
        self.relu = nn.ReLU()
        
    def forward(self, X):
        # F(X)
        out = self.conv(X)
        if self.diff_size:
            X = self.ws(X)

        # F + x
        y = out + X
        return y

In [7]:
class ResNet(nn.Module):
    def __init__(self, block, block_num, n_classes=10):
        super().__init__()
        
        self.input_channel = 64
        self.conv1 = nn.Conv2d(3, self.input_channel, 7, stride=2, padding=3)
        self.max_pool = nn.MaxPool2d(3, 2, padding=1)
        self.conv2 = self._make_conv_layers(block, output_channel=64, time=block_num[0])
        self.conv3 = self._make_conv_layers(block, output_channel=128, time=block_num[1])
        self.conv4 = self._make_conv_layers(block, output_channel=256, time=block_num[2])
        self.conv5 = self._make_conv_layers(block, output_channel=512, time=block_num[3])
        # kernel size 4 so that output is 1x1
        self.avg_pool = nn.AvgPool2d(4)
        self.linear = nn.Linear(self.input_channel, n_classes)
        
    def _make_conv_layers(self, block, output_channel, time):
        layers = []
        stride = 1
        
        # downsample, use stride 2
        if self.input_channel != output_channel:
            stride = 2

        layers.append(block(self.input_channel, output_channel, stride=stride))
        
        self.input_channel = output_channel * block.expansion
        for _ in range(1, time):
            # reminding layer, stride 1 because no downsample afterward
            layers.append(block(self.input_channel, output_channel))
            
        return nn.Sequential(*layers)
        
        
    def forward(self, X):
        x = self.conv1(X)
        x = self.max_pool(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avg_pool(x)
        # reshape for FC linear (num_sample, features)
        x = x.view(x.size()[0], -1)

        x = self.linear(x)
        return F.log_softmax(x, dim=1)

In [8]:
resnet = build_resnet18v2()

In [9]:
optimizer = optim.SGD(resnet.parameters(), lr=0.01, momentum=0.9)
cost_func = nn.NLLLoss()

In [10]:
epoch = 1

for i in (t := trange(epoch)):
    for images, targets in data_loader:
        resnet.zero_grad()
        yhat = resnet(images)
        cost = cost_func(yhat, targets)
        cost.backward()
        optimizer.step()
        
        t.set_description(f'cost is {cost.item()}')
        
        break

cost is 2.360973834991455: 100%|█████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  1.06it/s]


In [11]:
# inspect architecture
resnet

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (max_pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (conv2): Sequential(
    (0): BasicBlockV2(
      (conv): Sequential(
        (0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU()
        (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (4): ReLU()
        (5): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (relu): ReLU()
    )
    (1): BasicBlockV2(
      (conv): Sequential(
        (0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (1): ReLU()
        (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affin

In [12]:
# study H_out and W_out of AvgPool2d
kernel_size = (4, 4)
stride = kernel_size
padding = (0, 0)
H = 7
W = 7

H_out = ((H + 2 * padding[0] - kernel_size[0]) // stride[0]) + 1
H_out

1

In [13]:
# study H_out and W_out of Conv2d and MaxPool2d
kernel = 3
stride = 2
padding = 1
dilation = 1
H = 112
W = 112

H_out = ((H + 2 * padding - dilation * (kernel - 1) - 1) // stride) + 1
H_out

56