In [None]:
'''
torchvision: 画像認識のためのプロジェクト
CNN チャネル方法にも畳み込みを計算、得られる特徴量を」特徴マップと呼ぶ
ストライド：畳み込み演算を適用する間隔
パディング：特徴量マップの恥を適当な値で埋める処理、０で埋めることが多い
プーリング：機械的にカーネルの値をまとめる

カーネルと同じ形のブロックを想定する
最大プーリンぎう：ブロック内の最大値を次の層の画素値とする
平均プーリング：」ブロック内の平均値

プーリング・ストライドを使って特徴マップを縮小させる意味
・チャネル数が大きくなるので、計算量を抑える必要がある
・特徴量を集約
・受容野の拡大（プーリングの方が処理が計量

ResNet
残渣接続・スキップ接続

残渣接続の利点
・前の層で得られた特徴量を活躍しやすくなる
・勾配喪失が起きにくくなる
'''

'\ntorchvision: 画像認識のためのプロジェクト\nCNN チャネル方法にも畳み込みを計算、得られる特徴量を」特徴マップと呼ぶ\nストライド：畳み込み演算を適用する間隔\nパディング：特徴量マップの恥を適当な値で埋める処理、０で埋めることが多い\nプーリング：機械的にカーネルの値をまとめる\n\nカーネルと同じ形のブロックを想定する\n最大プーリンぎう：ブロック内の最大値を次の層の画素値とする\n平均プーリング：」ブロック内の平均値\n\nプーリング・ストライドを使って特徴マップを縮小させる意味\n・チャネル数が大きくなるので、計算量を抑える必要がある\n・特徴量を集約\n・受容野の拡大（プーリングの方が処理が計量\n\nResNet\n残渣接続・スキップ接続\n\n残渣接続の利点\n・前の層で得られた特徴量を活躍しやすくなる\n・勾配喪失が起きにくくなる\n'

In [None]:
from collections import deque
import copy
from tqdm import tqdm

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
import torchvision.transforms as T

# Googleドライブをマウント
from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('drive/MyDrive/python_image_recognition/4_classification/4_2_cnn')

import util
import eval

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
'''
データの標準化に使うへきん・標準偏差の計算処理の実装
画像データの標準化は各チャネルで行う
'''
def get_dataset_statistics(dataset: Dataset):
    data = []
    for i in range(len(dataset)):
        img = dataset[i][0]
        data.append(img)
    data = torch.stack(data)
    channel_mean, channel_std = data.mean(dim=(0,2,3)), data.std(dim=(0,2,3))

    return channel_mean, channel_std

In [None]:
class BasicBlock(nn.Module):
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        stride: int = 1
    ):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.downsample = None
        if stride > 1: # 残渣接続で特徴マップを縮小する場合、スキップ接続を通る特徴マップを縮小する必要がある
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x: torch.Tensor):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            x = self.downsample(x)

        out += x
        out = self.relu(out)

        return out

In [None]:
class ResNet18(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = nn.Sequential(
            BasicBlock(64, 64),
            BasicBlock(64, 64)
        )
        self.layer2 = nn.Sequential(
            BasicBlock(64, 128, stride=2),
            BasicBlock(128, 128)
        )
        self.layer3 = nn.Sequential(
            BasicBlock(128, 256, stride=2),
            BasicBlock(256, 256)
        )
        self.layer4 = nn.Sequential(
            BasicBlock(256, 512, stride=2),
            BasicBlock(512, 512)
        )

        self.avg_pool = nn.AdaptiveAvgPool2d(1) # 特徴マップの幅・大きさを1
        self.linear = nn.Linear(512, num_classes)

    def forward(self, x: torch.Tensor, return_embed: bool=False):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = x.flatten(1)

        return x if return_embed else self.linear(x)

    def get_device(self):
        return self.linear.weight.device

    def copy(self):
        return copy.deepcopy(self)

In [None]:
class Config:
    '''
    ハイパーパラメータとオプションの設定
    '''
    def __init__(self):
        self.val_ratio = 0.2   # 検証に使う学習セット内のデータの割合
        self.num_epochs = 30   # 学習エポック数
        self.lr = 1e-2         # 学習率
        self.moving_avg = 20   # 移動平均で計算する損失と正確度の値の数
        self.batch_size = 32   # バッチサイズ
        self.num_workers = 2   # データローダに使うCPUプロセスの数
        self.device = 'cuda'   # 学習に使うデバイス
        self.num_samples = 200 # t-SNEでプロットするサンプル数

def train_eval():
    config = Config()

    # 入力データ正規化のために学習セットのデータを使って
    # 各チャネルの平均と標準偏差を計算
    dataset = torchvision.datasets.CIFAR10(
        root='data', train=True, download=True,
        transform=T.ToTensor())
    channel_mean, channel_std = get_dataset_statistics(dataset)

    # 画像の整形を行うクラスのインスタンスを用意
    transforms = T.Compose((
        T.ToTensor(),
        T.Normalize(mean=channel_mean, std=channel_std),
    ))

    # 学習、評価セットの用意
    train_dataset = torchvision.datasets.CIFAR10(
        root='data', train=True, download=True,
        transform=transforms)
    test_dataset = torchvision.datasets.CIFAR10(
        root='data', train=False, download=True,
        transform=transforms)

    # 学習・検証セットへ分割するためのインデックス集合の生成
    val_set, train_set = util.generate_subset(
        train_dataset, config.val_ratio)

    print(f'学習セットのサンプル数　: {len(train_set)}')
    print(f'検証セットのサンプル数　: {len(val_set)}')
    print(f'テストセットのサンプル数: {len(test_dataset)}')

    # インデックス集合から無作為にインデックスをサンプルするサンプラー
    train_sampler = SubsetRandomSampler(train_set)

    # DataLoaderを生成
    train_loader = DataLoader(
        train_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=train_sampler)
    val_loader = DataLoader(
        train_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=val_set)
    test_loader = DataLoader(
        test_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers)

    # 目的関数の生成
    loss_func = F.cross_entropy

    # 検証セットの結果による最良モデルの保存用変数
    val_loss_best = float('inf')
    model_best = None

    # ResNet18モデルの生成
    model = ResNet18(len(train_dataset.classes))

    # モデルを指定デバイスに転送(デフォルトはGPU)
    model.to(config.device)

    # 最適化器の生成
    optimizer = optim.SGD(model.parameters(), lr=config.lr)

    for epoch in range(config.num_epochs):
        model.train()

        with tqdm(train_loader) as pbar:
            pbar.set_description(f'[エポック {epoch + 1}]')

            # 移動平均計算用
            losses = deque()
            accs = deque()
            for x, y in pbar:
                # データをモデルと同じデバイスに転送
                x = x.to(model.get_device())
                y = y.to(model.get_device())

                # パラメータの勾配をリセット
                optimizer.zero_grad()

                # 順伝播
                y_pred = model(x)

                # 学習データに対する損失と正確度を計算
                loss = loss_func(y_pred, y)
                accuracy = (y_pred.argmax(dim=1) == \
                            y).float().mean()

                # 誤差逆伝播
                loss.backward()

                # パラメータの更新
                optimizer.step()

                # 移動平均を計算して表示
                losses.append(loss.item())
                accs.append(accuracy.item())
                if len(losses) > config.moving_avg:
                    losses.popleft()
                    accs.popleft()
                pbar.set_postfix({
                    'loss': torch.Tensor(losses).mean().item(),
                    'accuracy': torch.Tensor(accs).mean().item()})

        # 検証セットを使って精度評価
        val_loss, val_accuracy = eval.evaluate(
            val_loader, model, loss_func)
        print(f'検証　: loss = {val_loss:.3f}, '
                f'accuracy = {val_accuracy:.3f}')

        # より良い検証結果が得られた場合、モデルを記録
        if val_loss < val_loss_best:
            val_loss_best = val_loss
            model_best = model.copy()

    # テスト
    test_loss, test_accuracy = eval.evaluate(
        test_loader, model_best, loss_func)
    print(f'テスト: loss = {test_loss:.3f}, '
          f'accuracy = {test_accuracy:.3f}')

    # t-SNEを使って特徴量の分布をプロット
    util.plot_t_sne(test_loader, model_best, config.num_samples)

In [None]:
train_eval()

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
学習セットのサンプル数　: 40000
検証セットのサンプル数　: 10000
テストセットのサンプル数: 10000


RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx