# 라이브러리

In [1]:
import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader

import torchvision
from torchvision.transforms import v2
from torchvision import datasets

# 디바이스 선택

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

print(device)

cuda


# 구글 드라이브 마운트

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 데이터셋 다운받기

In [4]:
transforms = v2.Compose([
    v2.Resize(224),
    v2.ToTensor()
])



In [5]:
root = '/content/drive/MyDrive/Colab Notebooks/DL_impl/ResNet/datasets'

train_ds = datasets.CIFAR10(root, train=True, transform=transforms, download=True)
test_ds = datasets.CIFAR10(root, train=False, transform=transforms, download=True)

Files already downloaded and verified
Files already downloaded and verified


In [6]:
# dataset 크기 확인
print(f'train dataset size: {len(train_ds)}')
print(f'test dataset size: {len(test_ds)}')

train dataset size: 50000
test dataset size: 10000


# 모델 정의하기
- 원본 코드에서 전반적인 구조를 참고했으나, 당장 필요해 보이는 부분을 제외하고는 제거하였음.
- 또한 downsampling 하는 과정에서 원본은 downsample 인자로 `Conv2d` 또는 `None`을 전달하지만, 나는 Bool 타입으로 전달하고 내부적으로 생성하는 방식을 이용했음.  
  - 내가 했을 때 생각나는 대로 했음.
  - 추후에 변경할 수도 있음.

In [7]:
class ResBlock(nn.Module):
  def __init__(self, in_channel, out_channel, downsample):
    super().__init__()

    self.stride = (2, 1) if downsample else (1, 1)
    self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=self.stride[0], padding=1)
    self.bn1 = nn.BatchNorm2d(out_channel)
    self.relu1 = nn.ReLU()
    self.conv2 = nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=self.stride[1], padding=1)
    self.bn2 = nn.BatchNorm2d(out_channel)
    self.relu2 = nn.ReLU()

    self.downsample = None if not downsample else nn.Sequential(
        nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=2),
        nn.BatchNorm2d(out_channel)
    )

  def forward(self, x):
    identity = x if not self.downsample else self.downsample(x)
    res = self.conv1(x)
    res = self.bn1(res)
    res = self.relu1(res)
    res = self.conv2(res)
    res = self.bn2(res)
    out = res + identity
    out = self.relu2(out)

    return out

In [8]:
class ResNet34(nn.Module):
  def __init__(self, block, blocks_per_layer, n_targets):
    super().__init__()

    self.input_layer = nn.Sequential(
        nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
    )

    self.layer1 = self._make_layer(block, 64, 64, blocks_per_layer[0])   # 이것만 downsampling 없음. 특징: in_channel == out_channel
    self.layer2 = self._make_layer(block, 64, 128, blocks_per_layer[1])
    self.layer3 = self._make_layer(block, 128, 256, blocks_per_layer[2])
    self.layer4 = self._make_layer(block, 256, 512, blocks_per_layer[3])

    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(512, n_targets)
    )

  def _make_layer(self, block, in_channel, out_channel, n_blocks):
    layers = []

    # 각 layer의 첫 번째 block
    # 첫 번째 layer만 시작하는 블록의 filter가 stride=1
    if in_channel == out_channel:
      layers.append(block(in_channel, out_channel, downsample=False))
    # 나머지 layer는 시작하는 블록의 filter가 strdie=2
    else:
      layers.append(block(in_channel, out_channel, downsample=True))

    for _ in range(1, n_blocks):
      layers.append(block(out_channel, out_channel, downsample=False))

    return nn.Sequential(*layers)

  def forward(self, x):
    x = self.input_layer(x)
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)
    x = self.layer4(x)
    x = self.avgpool(x)
    x = self.classifier(x)

    return x

In [9]:
blocks_per_layer = [3, 4, 6, 3]
n_targets = 10
model = ResNet34(ResBlock, blocks_per_layer, n_targets).to(device)

In [10]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [11]:
import torchinfo

batch_size = 256
torchinfo.summary(model, input_size=(batch_size, 3, 224, 224))

Layer (type:depth-idx)                   Output Shape              Param #
ResNet34                                 [256, 10]                 --
├─Sequential: 1-1                        [256, 64, 56, 56]         --
│    └─Conv2d: 2-1                       [256, 64, 112, 112]       9,472
│    └─BatchNorm2d: 2-2                  [256, 64, 112, 112]       128
│    └─ReLU: 2-3                         [256, 64, 112, 112]       --
│    └─MaxPool2d: 2-4                    [256, 64, 56, 56]         --
├─Sequential: 1-2                        [256, 64, 56, 56]         --
│    └─ResBlock: 2-5                     [256, 64, 56, 56]         --
│    │    └─Conv2d: 3-1                  [256, 64, 56, 56]         36,928
│    │    └─BatchNorm2d: 3-2             [256, 64, 56, 56]         128
│    │    └─ReLU: 3-3                    [256, 64, 56, 56]         --
│    │    └─Conv2d: 3-4                  [256, 64, 56, 56]         36,928
│    │    └─BatchNorm2d: 3-5             [256, 64, 56, 56]         128
│

# 학습 & 검증

In [12]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=.1, momentum=.9, weight_decay=1e-4)

In [13]:
def train(dl, transforms, model, loss_fn, optimizer):
  model.train()
  for idx, (X, y) in enumerate(dl):
    X = transforms(X)
    X, y = X.to(device), y.to(device)

    pred = model(X)
    loss = loss_fn(pred, y)
    correct = (pred.argmax(dim=1) == y).type(torch.float).sum().item()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if idx % 10 == 0 :
      print(f'Train loss: {loss.item()} -- Train correct: {correct / len(X)}')

def validate(dl, model, loss_fn):
  loss, correct = 0, 0

  model.eval()
  with torch.no_grad():
    for X, y in dl:
      X, y = X.to(device), y.to(device)

      pred = model(X)
      loss += loss_fn(pred, y).item()
      correct += (pred.argmax(dim=1) == y).type(torch.float).sum().item()

  print(f'Val loss: {loss / len(dl)} -- Val correct: {correct / len(dl.dataset)}')

In [14]:
batch_size = 256
train_dl = DataLoader(train_ds, batch_size=batch_size)
test_dl = DataLoader(test_ds, batch_size=batch_size)

In [15]:
eps = 5

for e in range(eps):
  print(f'===== epoch: {(e + 1)}/{eps} =====')
  train(train_dl, transforms, model, loss_fn, optimizer)
  validate(test_dl, model, loss_fn)

===== epoch: 1/5 =====




Train loss: 2.6265506744384766 -- Train correct: 0.08203125
Train loss: 3.1622865200042725 -- Train correct: 0.09375
Train loss: 3.0855393409729004 -- Train correct: 0.11328125
Train loss: 2.4562137126922607 -- Train correct: 0.12890625
Train loss: 2.3425276279449463 -- Train correct: 0.1484375
Train loss: 2.3114895820617676 -- Train correct: 0.12890625
Train loss: 2.243732213973999 -- Train correct: 0.171875
Train loss: 2.267669439315796 -- Train correct: 0.19140625
Train loss: 2.1771304607391357 -- Train correct: 0.18359375
Train loss: 2.037295341491699 -- Train correct: 0.22265625
Train loss: 2.151080846786499 -- Train correct: 0.19921875
Train loss: 2.0738768577575684 -- Train correct: 0.23828125
Train loss: 2.046452045440674 -- Train correct: 0.2265625
Train loss: 2.0712244510650635 -- Train correct: 0.265625
Train loss: 1.9917190074920654 -- Train correct: 0.2421875
Train loss: 1.9757195711135864 -- Train correct: 0.29296875
Train loss: 1.9731031656265259 -- Train correct: 0.2578