![data_loader](./ResNet.png)

In [208]:
import numpy as np
import torch 
import torch.nn as nn

from torchvision import datasets, transforms
from torch.utils.data import TensorDataset, DataLoader

In [209]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5)),
])

train_img = datasets.CIFAR10(
    root = 'data',
    train = True,
    download = True,
    transform = transform,
)

test_img = datasets.CIFAR10(
    root = 'data',
    train = False,
    download = True,
    transform = transform,
)

Files already downloaded and verified
Files already downloaded and verified


In [210]:
EPOCH = 10
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
torch.cuda.set_device(3)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using Device:", DEVICE)

train_loader = DataLoader(train_img, batch_size = BATCH_SIZE, shuffle = True)
test_loader = DataLoader(test_img, batch_size = BATCH_SIZE, shuffle = False)

Using Device: cuda


In [235]:
# short cut의 stride가 2인 부분은 H x W도 맞춰주기 위해서 
# 일반적인 skip connection은 channel수만 맞춰도 된다.
# 각 block에서 first를 제외한 second, third, fourth는 처음 단의 layer의 stride가 2로 H x W를 반으로 줄여주는 역할을 제공한다.
# stride가 2가 아니라면 padding이 존재하기 때문에 H x W가 바뀌지 않음. 
class ResNet50(nn.Module):
    def __init__(self, num_channels, num_classes):
        super(ResNet50, self).__init__()

        self.first_features = nn.Sequential(
            nn.Conv2d(num_channels, 64, kernel_size = 7, stride = 2, padding = 3), # padding을 3으로 줘야 112 x 112의 output을 얻을 수 있다.
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=False),
            nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        )
        
        self.first_block_1 = self._make_layers([64,64,256], in_channels = 64)
        self.first_block_2 = self._make_layers([64,64,256], in_channels = 256)
        self.first_block_3 = self._make_layers([64,64,256], in_channels = 256)
        self.short_1 = self._short_cut(64,256)
        
        self.second_block_1 = self._make_layers([128,128,512], in_channels = 256, stride = 2)
        self.second_block_2 = self._make_layers([128,128,512], in_channels = 512)
        self.second_block_3 = self._make_layers([128,128,512], in_channels = 512)
        self.second_block_4 = self._make_layers([128,128,512], in_channels = 512)
        self.short_2 = self._short_cut(256,512, stride = 2)
        
        self.third_block_1 = self._make_layers([256,256,1024], in_channels = 512, stride = 2)
        self.third_block_2 = self._make_layers([256,256,1024], in_channels = 1024)
        self.third_block_3 = self._make_layers([256,256,1024], in_channels = 1024)
        self.third_block_4 = self._make_layers([256,256,1024], in_channels = 1024)
        self.third_block_5 = self._make_layers([256,256,1024], in_channels = 1024)
        self.third_block_6 = self._make_layers([256,256,1024], in_channels = 1024)
        self.short_3 = self._short_cut(512,1024, stride = 2)
        
        self.fourth_block_1 = self._make_layers([512,512,2048], in_channels = 1024, stride = 2)
        self.fourth_block_2 = self._make_layers([512,512,2048], in_channels = 2048)
        self.fourth_block_3 = self._make_layers([512,512,2048], in_channels = 2048)
        self.short_4 = self._short_cut(1024,2048, stride = 2)

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Linear(2048, num_classes),
            nn.ReLU(inplace=False)
        )

        self.relu = nn.ReLU(inplace=False)
        
    def _make_layers(self, cfg, in_channels, stride = 1):
        layers = []

        c1 ,c2, c3 = cfg[0], cfg[1], cfg[2]
        layers +=[
            nn.Conv2d(in_channels, c1, kernel_size = 1),
            nn.BatchNorm2d(c1),
            nn.ReLU(inplace=False),
            nn.Conv2d(c1, c2, kernel_size = 3, stride = stride, padding = 1),
            nn.BatchNorm2d(c2),
            nn.ReLU(inplace=False),
            nn.Conv2d(c2, c3, kernel_size = 1),
            nn.BatchNorm2d(c3),
        ]
    
            

        return nn.Sequential(*layers)

    def _short_cut(self, in_channels, out_channels, stride = 1):
        layers = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = stride),
            nn.BatchNorm2d(out_channels)
        )

        return layers 
        
    def forward(self,x):
        
        x = self.first_features(x) # 64 x 56 x 56
        identity = x.clone() 
        
        x = self.relu(self.first_block_1(x))
        x = self.relu(self.first_block_2(x))
        x = self.relu(self.first_block_3(x))
        x = x + self.short_1(identity)
        x = self.relu(x)
        identity = x.clone()
        
        
        x = self.relu(self.second_block_1(x))
        x = self.relu(self.second_block_2(x))
        x = self.relu(self.second_block_3(x))
        x = self.relu(self.second_block_4(x))
        x = x + self.short_2(identity)
        x = self.relu(x)
        identity = x.clone()
        
        x = self.relu(self.third_block_1(x))
        x = self.relu(self.third_block_2(x))
        x = self.relu(self.third_block_3(x))
        x = self.relu(self.third_block_4(x))
        x = self.relu(self.third_block_5(x))
        x = self.relu(self.third_block_6(x))
        x = x + self.short_3(identity)
        x = self.relu(x)
        identity = x.clone()
        
        x = self.relu(self.fourth_block_1(x))
        x = self.relu(self.fourth_block_2(x))
        x = self.relu(self.fourth_block_3(x))
        x = x + self.short_4(identity)
        x = self.relu(x)
        # avg pooling 전까지 skip connection
        x = self.classifier(x)
        
        return x 
        

In [236]:
model = ResNet50(3, 10).to(DEVICE)
print(model)

ResNet50(
  (first_features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (first_block_1): Sequential(
    (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1))
    (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (first_block_2): Sequential(
    (0): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_ru

In [237]:
def train(train_loader, model, loss_fn, optimizer):
    model.train()
    
    size = len(train_loader.dataset)
    
    for batch, (X, y) in enumerate(train_loader):
        X, y = X.to(DEVICE), y.to(DEVICE)
        pred = model(X)

        # 손실 계산
        loss = loss_fn(pred, y)

        # 역전파
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f'loss: {loss:>7f}  [{current:>5d}]/{size:5d}')

In [238]:
def test(test_loader, model, loss_fn):
    model.eval()

    size = len(test_loader.dataset)
    num_batches = len(test_loader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in test_loader:
            X, y = X.to(DEVICE), y.to(DEVICE)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:8f}\n")

In [239]:
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = LEARNING_RATE, momentum=0.9)

In [240]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [241]:
for i in range(EPOCH) :
    print(f"Epoch {i+1} \n------------------------")
    train(train_loader, model, loss, optimizer)
    test(test_loader, model, loss)
print("Done!")

Epoch 1 
------------------------
loss: 2.355829  [    0]/50000
loss: 2.335413  [ 3200]/50000
loss: 2.294910  [ 6400]/50000
loss: 2.288863  [ 9600]/50000
loss: 2.303936  [12800]/50000
loss: 2.358929  [16000]/50000
loss: 2.240303  [19200]/50000
loss: 2.253047  [22400]/50000
loss: 2.251022  [25600]/50000
loss: 2.363446  [28800]/50000
loss: 2.180888  [32000]/50000
loss: 2.318145  [35200]/50000
loss: 2.275696  [38400]/50000
loss: 2.272674  [41600]/50000
loss: 2.189645  [44800]/50000
loss: 2.252161  [48000]/50000
Test Error: 
 Accuracy: 18.1%, Avg loss: 2.192346

Epoch 2 
------------------------
loss: 2.283826  [    0]/50000
loss: 2.334354  [ 3200]/50000
loss: 2.314361  [ 6400]/50000
loss: 2.156222  [ 9600]/50000
loss: 2.127439  [12800]/50000
loss: 2.132214  [16000]/50000
loss: 2.287440  [19200]/50000
loss: 2.265868  [22400]/50000
loss: 2.235168  [25600]/50000
loss: 2.420713  [28800]/50000
loss: 2.282444  [32000]/50000
loss: 2.214091  [35200]/50000
loss: 2.266836  [38400]/50000
loss: 1.996