# ResNetを再現実装

In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

import numpy as np
import matplotlib.pyplot as plt

from torchsummary import summary

from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter

In [2]:
BASE_DIR = '/home/mori/dev/DL-Models-Collection/ResNet'

In [3]:
def conv3x3(in_ch, out_ch, stride=1, padding=1):
    """3x3の畳み込み"""
    return nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=padding, bias=False)

def pad_channels(x, out_ch):
    """チャンネル数を合わせる"""
    pad_ch = out_ch - x.size(1)
    if pad_ch <= 0: return x
    return torch.cat([x, x.new_zeros(x.size(0), pad_ch, *x.shape[2:])], dim=1)

class BasicBlock(nn.Module):
    """
    BasicBlock
    2層の畳み込みの後にショートカットを足す
    """
    def __init__(self, in_ch, out_ch, stride=1):
        super().__init__()
        self.conv1 = conv3x3(in_ch, out_ch, stride)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = conv3x3(out_ch, out_ch)
        self.bn2 = nn.BatchNorm2d(out_ch)

        # 入力と出力のチャンネル数を合わせるために、ストライドで間引き & ゼロチャンネルで埋める
        if stride != 1 or in_ch != out_ch:
            # オプションAのショートカット
            self.shortcut = (lambda x: pad_channels(x[:, :, ::stride, ::stride], out_ch))
        else:
            self.shortcut = lambda x: x

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet20(nn.Module):
    """
    ResNet20
    入力画像のサイズは32x32
    """
    def __init__(self, num_classes=10):
        super().__init__()
        self.in_ch = 16

        self.conv1 = conv3x3(3, self.in_ch)  # 1層目

        self.bn1 = nn.BatchNorm2d(self.in_ch)
        self.layer1 = self._make_layer(out_ch=16, blocks=3, stride=1)  # 2~7層目
        self.layer2 = self._make_layer(out_ch=32, blocks=3, stride=2)  # 8~13層目
        self.layer3 = self._make_layer(out_ch=64, blocks=3, stride=2)  # 14~19層目

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, num_classes)  # 20層目

        # 重みの初期化
        self.apply(self._init_weights)

    def _make_layer(self, out_ch, blocks, stride):
        """
        BasicBlockをblocks回重ねた層を作成
        Args:
            out_ch (int): 出力チャンネル数
            blocks (int): BasicBlockを重ねる数
            stride (int): 畳み込みのストライド
        """
        layers = [BasicBlock(self.in_ch, out_ch, stride)]
        self.in_ch = out_ch
        for _ in range(1, blocks):
            layers.append(BasicBlock(out_ch, out_ch))
        return nn.Sequential(*layers)

    @staticmethod
    def _init_weights(m):
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
        elif isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.BatchNorm2d):
            nn.init.constant_(m.weight, 1)
            nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [4]:
# ダミー入力で動作確認
resnet20 = ResNet20()
dummy_input = torch.randn(1, 3, 32, 32)
output = resnet20(dummy_input)
print(output.shape)

torch.Size([1, 10])


In [5]:
# 層数が20レイヤーか確認
def count_layers(model):
    count = 0
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            count += 1
    return count

print("Layer count:", count_layers(resnet20))

Layer count: 20


In [6]:
# パラメータ数を確認
total_params = sum(p.numel() for p in resnet20.parameters() if p.requires_grad)
print(f"Total parameters: {total_params/1e6:.2f}M")

Total parameters: 0.27M


In [7]:
# torchsummaryでもアーキテクチャとパラメータ数を確認してみる
summary(resnet20, (3, 32, 32), device="cpu")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 32, 32]             432
       BatchNorm2d-2           [-1, 16, 32, 32]              32
            Conv2d-3           [-1, 16, 32, 32]           2,304
       BatchNorm2d-4           [-1, 16, 32, 32]              32
            Conv2d-5           [-1, 16, 32, 32]           2,304
       BatchNorm2d-6           [-1, 16, 32, 32]              32
        BasicBlock-7           [-1, 16, 32, 32]               0
            Conv2d-8           [-1, 16, 32, 32]           2,304
       BatchNorm2d-9           [-1, 16, 32, 32]              32
           Conv2d-10           [-1, 16, 32, 32]           2,304
      BatchNorm2d-11           [-1, 16, 32, 32]              32
       BasicBlock-12           [-1, 16, 32, 32]               0
           Conv2d-13           [-1, 16, 32, 32]           2,304
      BatchNorm2d-14           [-1, 16,

### CIFAR-10の読み込み & 前処理

In [8]:
# CIFAR-10データセットのダウンロード
x_train = np.load(os.path.join(BASE_DIR, 'data', 'x_train.npy'))
y_train = np.load(os.path.join(BASE_DIR, 'data', 'y_train.npy'))
x_test = np.load(os.path.join(BASE_DIR, 'data', 'x_test.npy'))
y_test = np.load(os.path.join(BASE_DIR, 'data', 'y_test.npy'))

# 形状の確認
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

(50000, 32, 32, 3)
(50000,)
(10000, 32, 32, 3)
(10000,)


In [9]:
class CustomDataset(Dataset):
    """ カスタムデータセットクラス """
    def __init__(self, x, y, transform):
        self.x = x
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        x = self.x[idx]
        y = self.y[idx]

        x = self.transform(x)
        y = torch.as_tensor(y, dtype=torch.long)

        return x, y

# transform定義
mean = (0.4914, 0.4822, 0.4465)
std = (0.2023, 0.1994, 0.2010)

transform_train = transforms.Compose([
    # NumPyをPILに変換
    transforms.ToPILImage(),
    # 各辺を4ピクセルのゼロパディングしたあとに32x32のランダムクロップ
    transforms.RandomCrop((32, 32), padding=4),
    # 50%の確率で水平反転
    transforms.RandomHorizontalFlip(p=0.5),
    # テンソルに変換
    transforms.ToTensor(),
    # 画像を標準化
    transforms.Normalize(mean, std)
])

transform_test = transforms.Compose([
    # テンソルに変換
    transforms.ToTensor(),
    # 画像を標準化
    transforms.Normalize(mean, std)
])

# データセット作成
train_dataset = CustomDataset(x_train, y_train, transform=transform_train)
test_dataset = CustomDataset(x_test, y_test, transform=transform_test)

# 学習

In [10]:
class TrainerAndTester:
    """
    モデル学習と誤差率での評価を行うクラス
    """
    def __init__(self, model, optimizer, max_iter, train_loader, test_loader, save_dir):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model = model.to(self.device)
        self.optimizer = optimizer

        self.max_iter = max_iter

        self.train_loader = train_loader
        self.test_loader = test_loader

        self.plot_save_path = os.path.join(save_dir, "plot.png")

        self.criterion = nn.CrossEntropyLoss()

        log_dir = os.path.join(save_dir, "logs")
        os.makedirs(log_dir, exist_ok=True)
        self.writer = SummaryWriter(log_dir)

    def train(self):
        current_iter = 0
        pbar = tqdm(total=self.max_iter, desc="Training")

        try:
            while current_iter < self.max_iter:
                for images, labels in self.train_loader:
                    # -------- forward / backward --------
                    self.model.train()
                    images = images.to(self.device, non_blocking=True)
                    labels = labels.to(self.device, non_blocking=True)

                    self.optimizer.zero_grad()
                    outputs = self.model(images)
                    loss = self.criterion(outputs, labels)
                    loss.backward()
                    self.optimizer.step()

                    # -------- bookkeeping --------
                    current_iter += 1
                    pbar.update(1)

                    # 4000iter ごとに誤差率を TensorBoard へ
                    if current_iter % 4000 == 0:
                        train_err = self.calc_error(self.train_loader)
                        test_err  = self.calc_error(self.test_loader)

                        # --- ★ ② ここで SummaryWriter ------------
                        self.writer.add_scalars(
                            main_tag="Error",
                            tag_scalar_dict={
                                "train_err": train_err,
                                "test_err": test_err,
                            },
                            global_step=current_iter,
                        )
                        self.writer.flush()
                        # -----------------------------------------

                        pbar.set_postfix(
                            TrainErr=f"{train_err:.4f}",
                            TestErr=f"{test_err:.4f}",
                            LR=self.optimizer.param_groups[0]['lr']
                        )

                    # LR スケジューリング
                    if current_iter in (32000, 48000):
                        for g in self.optimizer.param_groups:
                            g['lr'] *= 0.1

                    # 所定回数に達したら終了
                    if current_iter >= self.max_iter:
                        break
        finally:
            pbar.close()
            self.writer.close()   # --- ★ ③ 忘れずにクローズ
            # 退出時にテスト誤差を返す
        return self.calc_error(self.test_loader)

    def calc_error(self, loader):
        """
        渡されたデータローダーの誤差率を計算する
        Args:
            loader: データローダー
        Returns:
            float: 誤差率
        """
        self.model.eval()
        correct = 0
        total = 0

        # 正解数を計算
        with torch.no_grad():
            for images, labels in loader:
                images = images.to(self.device, non_blocking=True)
                labels = labels.to(self.device, non_blocking=True)

                outputs = self.model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # 誤差率を計算して返す
        return (1 - correct / total) * 100

In [11]:
# 学習率、バッチサイズ、イテレーション数設定
lr         = 0.1
batch_size = 128
max_iter   = 64000

# モデル設定
model = ResNet20()

# オプティマイザ設定
bn_params, other_params = [], []
for n,p in model.named_parameters():
    (bn_params if 'bn' in n else other_params).append(p)

# バッチ正規化パラメータにweight_decayを適用しないように設定
optimizer = torch.optim.SGD(
    [{'params': other_params , 'weight_decay':1e-4},
    {'params': bn_params    , 'weight_decay':0.   }],
    lr=lr, momentum=0.9
)

# データローダ作成
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
test_loader  = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

# 学習 & 評価
save_dir = "/home/mori/dev/DL-Models-Collection/ResNet"
last_err = TrainerAndTester(model, optimizer, max_iter, train_loader, test_loader, save_dir).train()
print(f"テストデータでの最終誤差率: {last_err}")

Training: 100%|██████████| 64000/64000 [28:41<00:00, 37.18it/s, LR=0.001, TestErr=8.7500, TrainErr=0.4460]  


テストデータでの最終誤差率: 8.750000000000002
