##### データセットの場所やバッチサイズなどの定数値の設定

In [3]:
import os
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'


# 使用するデバイス
# GPU を使用しない環境（CPU環境）で実行する場合は DEVICE = 'cpu' とする
DEVICE = 'cuda:0'

# 全ての訓練データを一回ずつ使用することを「1エポック」として，何エポック分学習するか
# 再開モードの場合も, このエポック数の分だけ追加学習される（N_EPOCHSは最終エポック番号ではない）
N_EPOCHS = 100

# 学習時のバッチサイズ
BATCH_SIZE = 100

# 訓練データセット（画像ファイルリスト）のファイル名
TRAIN_DATASET_CSV = './tinySTL10/train_list.csv'

# テストデータセット（画像ファイルリスト）のファイル名
TEST_DATASET_CSV = './tinySTL10/test_list.csv'

# 画像ファイルの先頭に付加する文字列（データセットが存在するディレクトリのパス）
DATA_DIR = './tinySTL10/'

# 画像サイズ
H = 96 # 縦幅（実際にはニューラルネットワーク内部の前処理にて 224x224 にリサイズするので, この値は使わない）
W = 96 # 横幅（同上）
C = 3 # チャンネル数（カラー画像なら3，グレースケール画像なら1）

# 学習結果の保存先フォルダ
MODEL_DIR = './ViT_models/'

# 学習結果のニューラルネットワークの保存先
MODEL_FILE = os.path.join(MODEL_DIR, 'ViT_object_recognizer_model.pth')

# 損失関数としてBCE損失を使用するか否か
# これを False にすると通常のクロスエントロピー損失が使用される
USE_BCE_LOSS = True

# 中断／再開の際に用いる一時ファイル
CHECKPOINT_EPOCH = os.path.join(MODEL_DIR, 'checkpoint_epoch.pkl')
CHECKPOINT_MODEL = os.path.join(MODEL_DIR, 'checkpoint_model.pth')
CHECKPOINT_OPT = os.path.join(MODEL_DIR, 'checkpoint_opt.pth')

##### ニューラルネットワークモデルの定義

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms


# 画像のパッチ分割および各パッチの平坦化を実行するクラス
#   - in_channels: 入力画像のチャンネル数（グレースケール画像なら1, カラー画像なら3）
#   - patch_size: パッチのサイズ（入力画像の縦横幅がパッチサイズの整数倍となるように設定すること．なお，パッチは正方形とする）
#   - layer_norm: パッチ分割後にLayer Normalizationを実行するか否か（実行する場合は True, 実行しない場合は False を指定）
class ToFlattenedPatches(nn.Module):
    def __init__(self, in_channels, patch_size, layer_norm=True):
        super(ToFlattenedPatches, self).__init__()
        self.patch_size = patch_size
        if layer_norm:
            self.ln = nn.LayerNorm([in_channels*(patch_size**2)])
        else:
            self.ln = None
    def forward(self, x):
        h = torch.cat([
            torch.cat([
                torch.reshape(q, (len(q), 1, -1)) for q in torch.split(p, self.patch_size, dim=3)
            ], dim=1) for p in torch.split(x, self.patch_size, dim=2)
        ], dim=1) # パッチ分割 + 各パッチ平坦化
        if self.ln is not None:
            h = self.ln(h) # Layer Normalization
        return h


# Linear Projectionによる各パッチの埋め込みを実行するクラス
#   - patch_dim: 埋め込み前のパッチの次元数（パッチ分割前の画像のチャンネル数 * パッチサイズの2乗）
#   - embed_dim: 埋め込み後のパッチ特徴量の次元数
#   - add_cls_token: CLSトークンを付加するか否か（付加する場合は True, 付加しない場合は False を指定）
#   - layer_norm: 埋め込み後にLayer Normalizationを実行するか否か（実行する場合は True, 実行しない場合は False を指定）
#   - bias: Linear Projectionのパラメータにバイアスベクトルを含めるか否か（含める場合は True, 含めない場合は False を指定）
class LinearProjection(nn.Module):
    def __init__(self, patch_dim, embed_dim=128, add_cls_token=False, layer_norm=True, bias=True):
        super(LinearProjection, self).__init__()
        self.linear_projection= nn.Linear(in_features=patch_dim, out_features=embed_dim, bias=bias)
        if add_cls_token:
            self.cls = nn.Parameter(torch.zeros(1, 1, embed_dim))
        else:
            self.cls = None
        if layer_norm:
            self.ln = nn.LayerNorm([embed_dim])
        else:
            self.ln = None
    def forward(self, x):
        h = self.linear_projection(x) # 平坦化後の各パッチを埋め込みベクトルに変換
        if self.ln is not None:
            h = self.ln(h) # Layer Normalization
        if self.cls is not None:
            h = torch.cat([self.cls.repeat_interleave(x.size()[0], dim=0), h], dim=1) # CLSトークン付加
        return h


# 位置エンコーディングの付加処理を実行するクラス
#   - n_patches: 想定されるパッチの数
#   - embed_dim: 埋め込み後のパッチ特徴量の次元数（偶数を想定）
#   - trainable: 位置エンコーディングを学習可能なパラメータとして設定するか否か（学習する場合は True, 学習しない場合は False を指定）
#   - random_initialize: パラメータをランダムに初期化するか，三角関数に基づいて初期化するか（trainable=False の場合は常に三角関数に基づいて設定される）
class PositionEmbeddings(nn.Module):
    def __init__(self, n_patches=256, embed_dim=128, trainable=False, random_initialize=False):
        super(PositionEmbeddings, self).__init__()
        if trainable and random_initialize:
            self.embeddings = nn.Parameter(torch.randn(1, n_patches, embed_dim))
        else:
            d = embed_dim // 2
            t = torch.arange(n_patches).repeat(1, 1)
            e = torch.log(torch.tensor(10000, device=t.device)) / d
            e = torch.exp(torch.arange(d, device=t.device) * -e)
            e = torch.unsqueeze(t, dim=-1) * e.repeat([1 for i in t.size()])
            self.embeddings = nn.Parameter(torch.cat((e.sin(), e.cos()), dim=-1))
        if not trainable:
            for param in self.parameters():
                param.requires_grad = False
    def forward(self):
        return self.embeddings


# Transformer Encoderの構成単位となる層
# 下記の TransformerEncoder クラスの内部で使用
class EncodingLayer(nn.Module):
    def __init__(self, num_heads=16, embed_dim=128, ffn_dim=128, ffn_dropout_ratio=0.1):
        super(EncodingLayer, self).__init__()
        self.ln1 = nn.LayerNorm([embed_dim])
        self.ln2 = nn.LayerNorm([embed_dim])
        self.mha = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, batch_first=True)
        if 0 < ffn_dropout_ratio and ffn_dropout_ratio <= 0.5:
            self.ffn = nn.Sequential(
                nn.Linear(in_features=embed_dim, out_features=ffn_dim),
                nn.GELU(),
                nn.Dropout(p=ffn_dropout_ratio),
                nn.Linear(in_features=ffn_dim, out_features=embed_dim),
                nn.Dropout(p=ffn_dropout_ratio),
            )
        else:
            self.ffn = nn.Sequential(
                nn.Linear(in_features=embed_dim, out_features=ffn_dim),
                nn.GELU(),
                nn.Linear(in_features=ffn_dim, out_features=embed_dim),
            )
    def forward(self, x):
        h = self.ln1(x) # Layer Normalization
        h, _ = self.mha(h, h, h) # Multihead Attention
        x = h + x # skip connection
        h = self.ln2(x) # Layer Normalization
        h = self.ffn(h) # Feed Forward Network
        y = h + x # skip connection
        return y


# Transformer Encoderの処理を実行するクラス
#   - num_layers: (Multi-Head Attention + Feed Forward Network)を何層分実行するか
#   - num_heads: Multi-Head Attentionにおけるヘッド数
#   - embed_dim: 埋め込み後のパッチ特徴量の次元数
#   - ffn_dim: Feed Forward Network内部における中間層のユニット数
#   - ffn_dropout_ratio: Feed Forward Network内部におけるドロップアウト層のドロップアウト率
#   - layer_norm: 最後ににLayer Normalizationを実行するか否か（実行する場合は True, 実行しない場合は False を指定）
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers=6, num_heads=16, embed_dim=128, ffn_dim=128, ffn_dropout_ratio=0.1, layer_norm=True):
        super(TransformerEncoder, self).__init__()
        layers = [EncodingLayer(num_heads, embed_dim, ffn_dim, ffn_dropout_ratio) for i in range(num_layers)]
        self.encoder = nn.Sequential(*layers)
        if layer_norm:
            self.ln = nn.LayerNorm([embed_dim])
        else:
            self.ln = None
    def forward(self, x):
        h = self.encoder(x)
        if self.ln is not None:
            h = self.ln(h) # Layer Normalization
        return h


# Vision TransformerによりSTL10物体画像認識AIを実現するニューラルネットワーク
class SimpleViT(nn.Module):

    # C: 入力画像のチャンネル数
    # N: 認識対象となるクラスの数
    # image_size: 画像を最初に正方形にリサイズする際の一辺の長さ
    # patch_size: 画像を正方形のパッチに分割する際のパッチサイズ（各パッチの一辺の長さ）
    # num_layers: (Multi-Head Attention + Feed Forward Network)を何層分実行するか
    # num_heads: Multi-Head Attentionにおけるヘッドの数
    # embed_dim: 各パッチを何次元のベクトル（埋め込みベクトル）で表現するか
    # ffn_dim: Feed Forward Networkにおける中間層のユニット数
    # dropout_ratio: Feed Forward Networkにおけるドロップアウト層のドロップアウト率
    def __init__(self, C, N, image_size=224, patch_size=16, num_layers=6, num_heads=8, embed_dim=128, ffn_dim=128, dropout_ratio=0.0):
        super(SimpleViT, self).__init__()

        # データ拡張
        self.data_augment = transforms.Compose(torch.nn.ModuleList([
            transforms.RandomHorizontalFlip(p=0.5), # 確率0.5で左右反転
            transforms.ColorJitter(brightness=0.2, contrast=0.15, saturation=0.1, hue=0.05), # カラージッター（明度を±20%，コントラストを±15%，彩度を±10%，色相を±5%の範囲内でランダムに変更）
            transforms.RandomAffine(degrees=(-15, 15), scale=(0.8, 1.2), translate=(0.1, 0.1)), # -15度～15度の範囲でランダムに回転，±10%の範囲でランダムに平行移動，さらに80%～120%の範囲内でランダムにスケーリング
            transforms.RandomErasing(p=0.5), # 確率0.5で一部を消去
        ]))

        # 前処理
        self.preprocess = transforms.Resize(image_size, antialias=False) # 入力画像を正方形にリサイズ

        # パッチ分割・埋め込み
        self.patch_embed = nn.Sequential(
            ToFlattenedPatches(in_channels=C, patch_size=patch_size, layer_norm=True),
            LinearProjection(patch_dim=C*(patch_size**2), embed_dim=embed_dim, add_cls_token=True, layer_norm=True),
        )

        # 位置エンコーディング付加
        self.pos_embed = PositionEmbeddings(embed_dim=embed_dim, n_patches=(image_size//patch_size)**2, trainable=False)

        # Transformer Encoder
        self.transformer_encoder = TransformerEncoder(
            num_layers=num_layers,
            num_heads=num_heads,
            embed_dim=embed_dim,
            ffn_dim=ffn_dim,
            ffn_dropout_ratio=dropout_ratio,
            layer_norm=True
        )

        # クラス分類のためのMLPヘッド
        self.mlp = nn.Linear(in_features=embed_dim, out_features=N)

    def forward(self, x, testmode=False):
        if not testmode:
            x = self.data_augment(x) # 訓練時のみデータ拡張（テスト時は実行しない）

        # 前処理（入力画像を正方形にリサイズ）
        h = self.preprocess(x)

        # 入力画像をパッチ分割し，各パッチを埋め込みベクトルに変換
        h = self.patch_embed(h)

        # 位置エンコーディングの情報を付加（ただし，先頭のCLSトークンには位置エンコーディングを付加しない）
        h[:, 1:] += self.pos_embed()

        # （マルチヘッドアテンション + MLP）× num_layers
        h = self.transformer_encoder(h)

        # 先頭のCLSトークンに対応する情報のみを取り出す
        h = h[:, 0]

        # 最終層
        y = self.mlp(h)
        return y

##### 訓練データセットの読み込み

In [None]:
import pickle
from torch.utils.data import DataLoader, random_split
from mylib.data_io import CSVBasedDataset
from mylib.utility import save_datasets, load_datasets_from_file


# 前回の試行の続きを行いたい場合は True にする -> 再開モードになる
RESTART_MODE = False


# 再開モードの場合は，前回使用したデータセットをロードして使用する
if RESTART_MODE:
    train_dataset, valid_dataset = load_datasets_from_file(MODEL_DIR)
    if train_dataset is None:
        print('error: there is no checkpoint previously saved.')
        exit()
    train_size = len(train_dataset)
    valid_size = len(valid_dataset)
    with open(os.path.join(MODEL_DIR, 'fdicts.pkl'), 'rb') as fdicts_file:
        fdicts = pickle.load(fdicts_file)
    n_classes = len(fdicts[1])

# そうでない場合は，データセットを読み込む
else:

    # CSVファイルを読み込み, 訓練データセットを用意
    dataset = CSVBasedDataset(
        filename = TRAIN_DATASET_CSV,
        items = [
            'File Path', # X
            'Class Name', # Y
        ],
        dtypes = [
            'image', # Xの型
            'label', # Yの型
        ],
        dirname = DATA_DIR,
        img_mode = 'color', # 強制的にカラー画像として読み込む
    )
    with open(os.path.join(MODEL_DIR, 'fdicts.pkl'), 'wb') as fdicts_file:
        pickle.dump(dataset.forward_dicts, fdicts_file)

    # 認識対象のクラス数を取得
    n_classes = len(dataset.forward_dicts[1])

    # 訓練データセットを分割し，一方を検証用に回す
    dataset_size = len(dataset)
    valid_size = int(0.05 * dataset_size) # 全体の 5% を検証用に
    train_size = dataset_size - valid_size # 残りの 95% を学習用に
    train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])

    # データセット情報をファイルに保存
    save_datasets(MODEL_DIR, train_dataset, valid_dataset)

# 訓練データおよび検証用データをミニバッチに分けて使用するための「データローダ」を用意
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True)

##### 学習処理の実行

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from mylib.visualizers import LossVisualizer
from mylib.utility import save_checkpoint, load_checkpoint


# 前回の試行の続きを行いたい場合は True にする -> 再開モードになる
RESTART_MODE = False


# エポック番号
INIT_EPOCH = 0 # 初期値
LAST_EPOCH = INIT_EPOCH + N_EPOCHS # 最終値

# ニューラルネットワークの作成
model = SimpleViT(C=C, N=n_classes, image_size=112, patch_size=14, num_layers=6, num_heads=16, embed_dim=128, ffn_dim=128, dropout_ratio=0.1).to(DEVICE)

# 最適化アルゴリズムの指定（ここでは SGD でなく Adam を使用）
optimizer = optim.Adam(model.parameters())

# 再開モードの場合は，前回チェックポイントから情報をロードして学習再開
if RESTART_MODE:
    INIT_EPOCH, LAST_EPOCH, model, optimizer = load_checkpoint(CHECKPOINT_EPOCH, CHECKPOINT_MODEL, CHECKPOINT_OPT, N_EPOCHS, model, optimizer)
    print('')

# 損失関数
loss_func = nn.BCEWithLogitsLoss() if USE_BCE_LOSS else nn.CrossEntropyLoss()

# 損失関数値を記録する準備
loss_viz = LossVisualizer(['train loss', 'valid loss', 'valid accuracy'], init_epoch=INIT_EPOCH)

# 勾配降下法による繰り返し学習
for epoch in range(INIT_EPOCH, LAST_EPOCH):

    print('Epoch {0}:'.format(epoch + 1))

    # 学習
    model.train()
    sum_loss = 0
    for X, Y in tqdm(train_dataloader): # X, Y は CSVBasedDataset クラスの __getitem__ 関数の戻り値に対応
        for param in model.parameters():
            param.grad = None
        X = X.to(DEVICE)
        Y = Y.to(DEVICE)
        Y_pred = model(X) # 入力画像 X を現在のニューラルネットワークに入力し，出力の推定値を得る
        if USE_BCE_LOSS:
            loss = loss_func(Y_pred, F.one_hot(Y, num_classes=n_classes).to(torch.float32)) # 損失関数の現在値を計算（BCE損失を用いる場合）
        else:
            loss = loss_func(Y_pred, Y) # 損失関数の現在値を計算（クロスエントロピー損失を用いる場合）
        loss.backward() # 誤差逆伝播法により，個々のパラメータに関する損失関数の勾配（偏微分）を計算
        optimizer.step() # 勾配に沿ってパラメータの値を更新
        sum_loss += float(loss) * len(X)
    avg_loss = sum_loss / train_size
    loss_viz.add_value('train loss', avg_loss) # 訓練データに対する損失関数の値を記録
    print('train loss = {0:.6f}'.format(avg_loss))

    # 検証
    model.eval()
    sum_loss = 0
    n_failed = 0
    with torch.inference_mode():
        for X, Y in tqdm(valid_dataloader):
            X = X.to(DEVICE)
            Y = Y.to(DEVICE)
            Y_pred = model(X, testmode=True)
            if USE_BCE_LOSS:
                loss = loss_func(Y_pred, F.one_hot(Y, num_classes=n_classes).to(torch.float32))
            else:
                loss = loss_func(Y_pred, Y)
            sum_loss += float(loss) * len(X)
            n_failed += float(torch.count_nonzero(torch.argmax(Y_pred, dim=1) - Y)) # 推定値と正解値が一致していないデータの個数を数える
    avg_loss = sum_loss / valid_size
    accuracy = (valid_size - n_failed) / valid_size
    loss_viz.add_value('valid loss', avg_loss) # 検証用データに対する損失関数の値を記録
    loss_viz.add_value('valid accuracy', accuracy) # 検証用データに対する認識精度の値を記録
    print('valid loss = {0:.6f}'.format(avg_loss))
    print('accuracy = {0:.2f}%'.format(100 * accuracy))
    print('')

    # 現在の学習状態を一時ファイルに保存
    save_checkpoint(CHECKPOINT_EPOCH, CHECKPOINT_MODEL, CHECKPOINT_OPT, epoch+1, model, optimizer)

# 学習結果のニューラルネットワークモデルをファイルに保存
model = model.to('cpu')
torch.save(model.state_dict(), MODEL_FILE)

# 損失関数の記録をファイルに保存
loss_viz.save(v_file=os.path.join(MODEL_DIR, 'loss_graph.png'), h_file=os.path.join(MODEL_DIR, 'loss_history.csv'))

##### テストデータセットの読み込み

In [9]:
import pickle
from torch.utils.data import DataLoader
from mylib.data_io import CSVBasedDataset


# CSVファイルを読み込み, テストデータセットを用意
with open(os.path.join(MODEL_DIR, 'fdicts.pkl'), 'rb') as fdicts_file:
    fdicts = pickle.load(fdicts_file)
test_dataset = CSVBasedDataset(
    filename = TEST_DATASET_CSV,
    items = [
        'File Path', # X
        'Class Name', # Y
    ],
    dtypes = [
        'image', # Xの型
        'label', # Yの型
    ],
    dirname = DATA_DIR,
    fdicts = fdicts,
    img_mode = 'color', # 強制的にカラー画像として読み込む
)
test_size = len(test_dataset)
rdict = test_dataset.reverse_dicts[1]

# 認識対象のクラス数を取得
n_classes = len(test_dataset.forward_dicts[1])

# テストデータをミニバッチに分けて使用するための「データローダ」を用意
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True)

##### 学習済みニューラルネットワークモデルのロード

In [None]:
import torch


# ニューラルネットワークモデルとその学習済みパラメータをファイルからロード
model = SimpleViT(C=C, N=n_classes, image_size=112, patch_size=14, num_layers=6, num_heads=16, embed_dim=128, ffn_dim=128, dropout_ratio=0.1)
model.load_state_dict(torch.load(MODEL_FILE))

##### 単一画像に対するテスト処理の実行

In [None]:
import torch
from mylib.data_io import show_single_image


model = model.to(DEVICE)
model.eval()

# index 番目のテストデータをニューラルネットワークに入力してみる
while True:
    print('index?: ', end='')
    val = input()
    if val == 'exit': # 'exit' とタイプされたら終了
        break
    index = int(val)
    x, y = test_dataset[index]
    x = x.reshape(1, *x.size()).to(DEVICE)
    with torch.inference_mode():
        y_pred = model(x, testmode=True)
    y_pred = torch.argmax(y_pred, dim=1)
    print('')
    print('estimated:', rdict[int(y_pred)])
    print('ground truth:', rdict[int(y)])
    print('')
    show_single_image(x.to('cpu'), title='input image', sec=1)

##### 全ての画像に対するテスト処理の実行

In [None]:
import torch
from tqdm import tqdm


model = model.to(DEVICE)
model.eval()

# テストデータセットを用いて認識精度を評価
n_failed = 0
with torch.inference_mode():
    for X, Y in tqdm(test_dataloader):
        X = X.to(DEVICE)
        Y = Y.to(DEVICE)
        Y_pred = model(X, testmode=True)
        n_failed += torch.count_nonzero(torch.argmax(Y_pred, dim=1) - Y) # 推定値と正解値が一致していないデータの個数を数える
    accuracy = (test_size - n_failed) / test_size
    print('accuracy = {0:.2f}%'.format(100 * accuracy))
    print('')