## vision Transformerの実装
- 参考にした記事 https://qiita.com/zisui-sukitarou/items/d990a9630ff2c7f4abf2
- 今回も例によってcifar-10によって実行する

### vision transformernモデルを実装して理解することを目標にする。<br>
<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2F8ce3dea7-0287-85c6-4461-2a085185ed95.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=12ce97077cfddec29ed1d0ed55be3dd9" width="40%">
</div>

#### ViTとは
- Transformerのエンコーダー部分を画像認識に応用したもの

#### 特徴
1. SoTAを上回る精度
1. 畳み込みを行わないモデル
1. それまでのSoTAよりも大幅に小さい計算コスト

#### モデル概要
1. 画像がパッチに分割される
1. 各パッチがベクトルに変換される
1. その先頭に[class]トークンを付与したものに位置エンコーディングが加算される
1. それがTransformer Encoderによって処理される
1. その出力の0番目のベクトルがMLP headで処理されてクラスが出力される


In [23]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
import os
from sklearn.utils import shuffle
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from einops import repeat
from einops.layers.torch import Rearrange

In [24]:
config={
    'BatchSize':128,
    'seed':42,
    'n_epochs' : 50,
    'lr' : 0.001
}

In [25]:

trainval_dataset = datasets.CIFAR10('../data/cifar10', train=True,download=True,transform=transforms.ToTensor())

# 前処理を定義
transform = transforms.Compose([transforms.ToTensor()])

trainval_dataset = datasets.CIFAR10('../data/cifar10', train=True, transform=transform)

# trainとvalidに分割
train_dataset, val_dataset = torch.utils.data.random_split(trainval_dataset, [len(trainval_dataset)-10000, 10000],generator=torch.Generator().manual_seed(config['seed']))

dataloader_train = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=config['BatchSize'],
    shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=config['BatchSize'],
    shuffle=True
)

print("Train data number:{}, Valid data number: {}".format(len(train_dataset), len(val_dataset)))

Files already downloaded and verified
Train data number:40000, Valid data number: 10000


- H(image_size) : 画像の縦の長さ
- W(image_size) : 画像の横の長さ（今回はH=W）
- B(batch_size) : バッチサイズ
- P(patch_size) : パッチサイズ（縦の長さと、横の長さ）
- C(channels) : チャンネル数（RGBの場合C=3）
- D(dim) : パッチベクトル変数後のベクトルの長さ
- N(n_patches) : パッチの数

In [26]:
class ViT(nn.Module):
    def __init__(self, image_size, patch_size, n_classes, dim, depth, n_heads, channels = 3, mlp_dim = 256):
        """ [input]
            - image_size (int) : 画像の縦の長さ（= 横の長さ）
            - patch_size (int) : パッチの縦の長さ（= 横の長さ）
            - n_classes (int) : 分類するクラスの数
            - dim (int) : 各パッチのベクトルが変換されたベクトルの長さ（参考[1] (1)式 D）
            - depth (int) : Transformer Encoder の層の深さ（参考[1] (2)式 L）
            - n_heads (int) : Multi-Head Attention の head の数
            - chahnnels (int) : 入力のチャネル数（RGBの画像なら3）
            - mlp_dim (int) : MLP の隠れ層のノード数
        """

        super().__init__()
        
        # Params
        n_patches = (image_size // patch_size) ** 2
        patch_dim = channels * patch_size * patch_size
        self.depth = depth

        # Layers
        self.patching = Patching(patch_size = patch_size)
        self.linear_projection_of_flattened_patches = LinearProjection(patch_dim = patch_dim, dim = dim)
        self.embedding = Embedding(dim = dim, n_patches = n_patches)
        self.transformer_encoder = TransformerEncoder(dim = dim, n_heads = n_heads, mlp_dim = mlp_dim, depth = depth)
        self.mlp_head = MLPHead(dim = dim, out_dim = n_classes)


    def forward(self, img):
        """ [input]
            - img (torch.Tensor) : 画像データ
                - img.shape = torch.Size([batch_size, channels, image_height, image_width])
        """

        x = img

        # 1. パッチに分割
        # x.shape : [batch_size, channels, image_height, image_width] -> [batch_size, n_patches, channels * (patch_size ** 2)]
        x = self.patching(x)

        # 2. 各パッチをベクトルに変換
        # x.shape : [batch_size, n_patches, channels * (patch_size ** 2)] -> [batch_size, n_patches, dim]
        x = self.linear_projection_of_flattened_patches(x)

        # 3. [class] トークン付加 + 位置エンコーディング 
        # x.shape : [batch_size, n_patches, dim] -> [batch_size, n_patches + 1, dim]
        x = self.embedding(x)

        # 4. Transformer Encoder
        # x.shape : No Change
        x = self.transformer_encoder(x)

        # 5. 出力の0番目のベクトルを MLP Head で処理
        # x.shape : [batch_size, n_patches + 1, dim] -> [batch_size, dim] -> [batch_size, n_classes]
        x = x[:, 0]
        x = self.mlp_head(x)

        return x

### 1. パッチに分割
<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2F272cb2b1-9c9c-1c28-cf4c-d58927226109.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=394c70144fdeafec0d1a45d107ce5faf" width="60%">
</div>

- 一枚の画像をパッチに分割する。画像を９個に切り分けて、左上から横に並べていくだけ
- 元の画像は[C,H,W]の三次元配列であったが、切り分けた後はC*P^2の一次元配列になっているので、xのサイズは<br>
    [B,C,H,W]→[B,N,C*P^2]

In [27]:
class Patching(nn.Module):
    def __init__(self, patch_size):
        """ [input]
            - patch_size (int) : パッチの縦の長さ（=横の長さ）
        """
        super().__init__()
        self.net = Rearrange("b c (h ph) (w pw) -> b (h w) (ph pw c)", ph = patch_size, pw = patch_size)
    
    def forward(self, x):
        """ [input]
            - x (torch.Tensor) : 画像データ
                - x.shape = torch.Size([batch_size, channels, image_height, image_width])
        """
        x = self.net(x)
        return x

### 2.各パッチをベクトルに変換

<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2F993cc372-2e26-817a-21d8-55911af9403a.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=0300afbe0e313c4d2d5f123b5e7414b8" width="60%">
</div>

- 各パッチのベクトルを別サイズのベクトルに変換する。各パッチのベクトルの長さ(patch_dim)はC*P^2になる
- nn.Linearの部分の行列も学習可能なパラメーター

In [28]:
class LinearProjection(nn.Module):
    def __init__(self, patch_dim, dim):
        """ [input]
            - patch_dim (int) : 一枚あたりのパッチの次元（= channels * (patch_size ** 2)）
            - dim (int) : パッチが変換されたベクトルの次元 
        """
        super().__init__()
        self.net = nn.Linear(patch_dim, dim)

    def forward(self, x):
        """ [input]
            - x (torch.Tensor) 
                - x.shape = torch.Size([batch_size, n_patches, patch_dim])
        """
        x = self.net(x)
        return x

### 3.[class]トークン付与、位置エンコーディング
<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2F134cffbb-2684-a452-8865-72d7a2dc3fba.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=2866cff924c0a6dfa5e8b46f667a89cf" width="60%">
</div>
「2. 各パッチをベクトルに変換」によって作られた 
（コード中「n_patches」）個のパッチのベクトル達の先頭に [class] トークンを付加する。<br>
これは学習可能なパラメータで、Transformer Encoder によって処理された後の [class] トークンに対応する部分（正確にはそれを nn.Linear(dim, n_classes) で処理したもの）が、予測結果を返してくれる。

この時点で、x のサイズは [B,N+1,D] となる。（[class] トークンの分）。

その後、位置エンコーディングを行う。後ほど説明する Transformer Encoder では、入力トークンの位置情報を把握することができないため、位置情報をあらかじめ付加する必要あり。<br>
実装としては、(N+1)*Dの行列 を加算します。これは、学習可能なパラメータ



In [29]:
class Embedding(nn.Module):
    def __init__(self, dim, n_patches):
        """ [input]
            - dim (int) : パッチが変換されたベクトルの次元
            - n_patches (int) : パッチの枚数
        """
        super().__init__()
        # class token
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))

        # position embedding
        self.pos_embedding = nn.Parameter(torch.randn(1, n_patches + 1, dim))
    
    def forward(self, x):
        """[input]
            - x (torch.Tensor)
                - x.shape = torch.Size([batch_size, n_patches, dim])
        """
        # バッチサイズを抽出
        batch_size, _, __ = x.shape

        # [class] トークン付加
        # x.shape : [batch_size, n_patches, patch_dim] -> [batch_size, n_patches + 1, patch_dim]
        cls_tokens = repeat(self.cls_token, "1 1 d -> b 1 d", b = batch_size)
        x = torch.concat([cls_tokens, x], dim = 1)

        # 位置エンコーディング加算
        x += self.pos_embedding

        return x

### 4.Transformer Encoder
<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2Fe8ecf326-2b6c-9f38-8030-7d5b38017c0b.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=bcb0dee645ba5221eb27085fddb38459" width="20%">
</div>

Transformer Encoderの構成要素は四つに分かれている
- 残差接続
- Layer Normalization(Norm)
- Multi-Head Self-Atteintion
- Multi Layer Perceptron

In [30]:
class TransformerEncoder(nn.Module):
    def __init__(self, dim, n_heads, mlp_dim, depth):
        """ [input]
            - dim (int) : 各パッチのベクトルが変換されたベクトルの長さ（参考[1] (1)式 D）
            - depth (int) : Transformer Encoder の層の深さ（参考[1] (2)式 L）
            - n_heads (int) : Multi-Head Attention の head の数
            - mlp_dim (int) : MLP の隠れ層のノード数
        """
        super().__init__()

        # Layers
        self.norm = nn.LayerNorm(dim)
        self.multi_head_attention = MultiHeadAttention(dim = dim, n_heads = n_heads)
        self.mlp = MLP(dim = dim, hidden_dim = mlp_dim)
        self.depth = depth

    def forward(self, x):
        """[input]
            - x (torch.Tensor)
                - x.shape = torch.Size([batch_size, n_patches + 1, dim])
        """
        for _ in range(self.depth):
            x = self.multi_head_attention(self.norm(x)) + x
            x = self.mlp(self.norm(x)) + x

        return x

In [31]:
class MultiHeadAttention(nn.Module):
    def __init__(self, dim, n_heads):
        """ [input]
            - dim (int) : パッチのベクトルが変換されたベクトルの長さ
            - n_heads (int) : heads の数
        """
        super().__init__()
        self.n_heads = n_heads
        self.dim_heads = dim // n_heads

        self.W_q = nn.Linear(dim, dim)
        self.W_k = nn.Linear(dim, dim)
        self.W_v = nn.Linear(dim, dim)

        self.split_into_heads = Rearrange("b n (h d) -> b h n d", h = self.n_heads)

        self.softmax = nn.Softmax(dim = -1)

        self.concat = Rearrange("b h n d -> b n (h d)", h = self.n_heads)

    def forward(self, x):
        """[input]
            - x (torch.Tensor)
                - x.shape = torch.Size([batch_size, n_patches + 1, dim])
        """
        q = self.W_q(x)
        k = self.W_k(x)
        v = self.W_v(x)

        q = self.split_into_heads(q)
        k = self.split_into_heads(k)
        v = self.split_into_heads(v)

        # Logit[i] = Q[i] * tK[i] / sqrt(D) (i = 1, ... , n_heads)
        # AttentionWeight[i] = Softmax(Logit[i]) (i = 1, ... , n_heads)
        logit = torch.matmul(q, k.transpose(-1, -2)) * (self.dim_heads ** -0.5)
        attention_weight = self.softmax(logit)

        # Head[i] = AttentionWeight[i] * V[i] (i = 1, ... , n_heads)
        # Output = concat[Head[1], ... , Head[n_heads]]
        output = torch.matmul(attention_weight, v)
        output = self.concat(output)
        return output

In [32]:
class MLP(nn.Module):
    def __init__(self, dim, hidden_dim):
        """ [input]
            - dim (int) : パッチのベクトルが変換されたベクトルの長さ
            - hidden_dim (int) : 隠れ層のノード数
        """
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Linear(hidden_dim, dim)
        )

    def forward(self, x):
        """[input]
            - x (torch.Tensor)
                - x.shape = torch.Size([batch_size, n_patches + 1, dim])
        """
        x = self.net(x)
        return x

### 5.MLP Head
<div align="center">
<img src="https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.ap-northeast-1.amazonaws.com%2F0%2F585587%2Ff461422f-4f4e-7ab2-bad1-017d3100abf3.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&w=1400&fit=max&s=ea2944078c2fd27bb3e2076635883139" width="50%">
</div>

Transformer Encoder で処理された後の [class] トークンに対応する部分を MLP Head で処理。具体的には、最初に Layer Norm で処理し、その後、クラスの数の長さのベクトルに線形で変換。

In [33]:
class MLPHead(nn.Module):
    def __init__(self, dim, out_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, out_dim)
        )
    
    def forward(self, x):
        x = self.net(x)
        return x

In [34]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [35]:

model = ViT(
    image_size=32,
    patch_size=4,
    n_classes=10,
    dim=256,
    depth=3,
    n_heads=4,
    mlp_dim = 256
).to(device)


loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [36]:
for epoch in range(config['n_epochs']):
    losses_train = []
    losses_valid = []

    model.train()
    n_train = 0
    acc_train = 0
    for x, t in dataloader_train:
        n_train += t.size()[0]

        model.zero_grad()  # 勾配の初期化

        x = x.to(device)  # テンソルをGPUに移動
        t = t.to(device)

        y = model.forward(x)  # 順伝播


        loss = loss_function(y, t)  # 誤差(クロスエントロピー誤差関数)の計算

        loss.backward()  # 誤差の逆伝播

        optimizer.step()  # パラメータの更新

        pred = y.argmax(1)  # 最大値を取るラベルを予測ラベルとする

        acc_train += (pred == t).float().sum().item()
        losses_train.append(loss.tolist())

    model.eval()
    n_val = 0
    acc_val = 0
    for x, t in dataloader_valid:
        n_val += t.size()[0]

        x = x.to(device)  # テンソルをGPUに移動
        t = t.to(device)

        y = model.forward(x)  # 順伝播

        loss = loss_function(y, t)  # 誤差(クロスエントロピー誤差関数)の計算

        pred = y.argmax(1)  # 最大値を取るラベルを予測ラベルとする

        acc_val += (pred == t).float().sum().item()
        losses_valid.append(loss.tolist())

    print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]]'.format(
        epoch+1,
        np.mean(losses_train),
        acc_train/n_train,
        np.mean(losses_valid),
        acc_val/n_val,
    ))

EPOCH: 1, Train [Loss: 2.069, Accuracy: 0.235], Valid [Loss: 1.945, Accuracy: 0.300]]
EPOCH: 2, Train [Loss: 1.785, Accuracy: 0.360], Valid [Loss: 1.750, Accuracy: 0.374]]
EPOCH: 3, Train [Loss: 1.683, Accuracy: 0.396], Valid [Loss: 1.660, Accuracy: 0.407]]
EPOCH: 4, Train [Loss: 1.596, Accuracy: 0.430], Valid [Loss: 1.560, Accuracy: 0.443]]
EPOCH: 5, Train [Loss: 1.531, Accuracy: 0.451], Valid [Loss: 1.538, Accuracy: 0.450]]
EPOCH: 6, Train [Loss: 1.475, Accuracy: 0.469], Valid [Loss: 1.442, Accuracy: 0.489]]
EPOCH: 7, Train [Loss: 1.422, Accuracy: 0.489], Valid [Loss: 1.436, Accuracy: 0.483]]
EPOCH: 8, Train [Loss: 1.365, Accuracy: 0.508], Valid [Loss: 1.390, Accuracy: 0.506]]
EPOCH: 9, Train [Loss: 1.314, Accuracy: 0.527], Valid [Loss: 1.362, Accuracy: 0.511]]
EPOCH: 10, Train [Loss: 1.273, Accuracy: 0.540], Valid [Loss: 1.326, Accuracy: 0.526]]
EPOCH: 11, Train [Loss: 1.232, Accuracy: 0.557], Valid [Loss: 1.291, Accuracy: 0.543]]
EPOCH: 12, Train [Loss: 1.192, Accuracy: 0.571], Val

In [37]:
import torchvision
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transforms.ToTensor() )
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=True, num_workers=2)

correct = 0
total = 0
# 勾配を記憶せず（学習せずに）に計算を行う
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device),data[1].to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))

Files already downloaded and verified
Accuracy of the network on the 10000 test images: 54 %
