# TrasnformerのEncoderの実装

## 1. セットアップ

In [1]:
# ライブラリの読み込み
import numpy as np
import random
import math
import torch
import torch.nn as nn
import torch.nn.functional as F 
import torchtext

In [2]:
# 乱数の固定
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

## 2. モデルの実装と動作確認

### 2-1. 単語埋め込み

In [3]:
class Embedder(nn.Module):
    '''
    埋め込みベクトルの計算を行う
    idで示されている単語をベクトルに変換
    '''

    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()

        self.embeddings = nn.Embedding.from_pretrained(
            embeddings=text_embedding_vectors, freeze=True)
        # freeze=Trueに設定すると、逆伝播の際に更新されなくなる

    def forward(self, x):
        '''
        順伝播
        '''
        x_vec = self.embeddings(x)

        return x_vec

### 動作確認

In [4]:
# DataLoaderなどを取得
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT

# ./data/aclImdb_v1.tar.gzがない場合は、ダウンロードに時間がかかる
# ./data/wiki-news-300d-1M.vec.zipがない場合は、ダウンロードに時間がかかる
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(max_length=256, batch_size=24)

# ミニバッチの用意
batch = next(iter(train_dl))

# モデル構築
net1 = Embedder(TEXT.vocab.vectors)

# 入出力
x = batch.Text[0]
x1 = net1(x)  # 単語をベクトルに

print("入力のテンソルサイズ：", x.shape)
print("出力のテンソルサイズ：", x1.shape)

  0%|                                                                                                           | 0/999994 [00:00<?, ?it/s]Skipping token b'999994' with 1-dimensional vector [b'300']; likely a header
100%|███████████████████████████████████████████████████████████████████████████████████████████| 999994/999994 [01:27<00:00, 11375.48it/s]


入力のテンソルサイズ： torch.Size([24, 256])
出力のテンソルサイズ： torch.Size([24, 256, 300])


### 2-2. 位置エンコーディング

In [5]:
class PositionalEncoder(nn.Module):
    '''
    位置エンコーディングの計算
    入力された単語の位置を示す情報をベクトルに加える
    '''

    def __init__(self, d_model=300, max_seq_len=256):
        super().__init__()

        # 埋め込みベクトルの次元数
        self.d_model = d_model  

        # 単語の順番（pos）と埋め込みベクトルの次元の位置（i）によって一意に定まる値の表をpeとして作成
        pe = torch.zeros(max_seq_len, d_model)

        # GPUが使える場合はGPUへ送る（ここでは省略、学習時には使用する）
        # device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        # pe = pe.to(device)

        # 位置エンコーディングの値を計算
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2*i)/d_model)))
                pe[pos, i+1] = math.cos(pos / (10000 ** ((2*i)/d_model)))

        # peの先頭に、ミニバッチを表す次元を追加
        self.pe = pe.unsqueeze(0)

        # 勾配を計算しないようにする
        self.pe.requires_grad = False

    def forward(self, x):
        '''
        順伝播
        '''
        # 入力xとpeを足し算する
        # xがpeよりも小さいので、次元数の平方根を掛けて大きくする
        ret = math.sqrt(self.d_model)*x + self.pe
        return ret

### 動作確認

In [6]:
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)

# 入出力
x = batch.Text[0]
x1 = net1(x)  # 単語をベクトルに
x2 = net2(x1)

print("入力のテンソルサイズ：", x1.shape)
print("出力のテンソルサイズ：", x2.shape)

入力のテンソルサイズ： torch.Size([24, 256, 300])
出力のテンソルサイズ： torch.Size([24, 256, 300])


### 2-3. Attention

In [7]:
class Attention(nn.Module):
    '''
    本来のTransformerはマルチヘッドAttentionだが
    分かりやすさを優先してシングルAttentionで実装
    '''

    def __init__(self, d_model=300):
        super().__init__()

        # 全結合層を用いて特徴量をQ,K,Vに変換する
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        
        # 出力時に使用する全結合層
        self.out = nn.Linear(d_model, d_model)

        # Attentionの大きさ調整の変数
        self.d_k = d_model

    def forward(self, q, k, v, mask):
        '''
        順伝播
        '''
        
        # 全結合層で特徴量を変換
        q = self.q_linear(q)
        k = self.k_linear(k)
        v = self.v_linear(v)

        # Attentionの値を計算する
        # 各値を足し算すると大きくなりすぎるので、root(d_k)で割って調整
        weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)

        # ここでmaskを計算
        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask == 0, -1e9)

        # softmaxを用いて、Attentionの値を0〜1の範囲に収める
        normlized_weights = F.softmax(weights, dim=-1)

        # AttentionをValueとかけ算
        output = torch.matmul(normlized_weights, v)

        # 全結合層で特徴量を変換
        output = self.out(output)

        return output, normlized_weights

### 2-4. FeedForwardブロック

In [8]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=1024, dropout=0.1):
        '''
        FeedForwardブロック
        Attention層からの出力を、全結合層2つで変換する
        '''
        super().__init__()

        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        '''
        順伝播
        '''
        x = self.linear_1(x)
        x = self.dropout(F.relu(x))
        x = self.linear_2(x)
        return x

### 2-5. Transformerブロック

In [9]:
class TransformerBlock(nn.Module):
    '''
    Transformerブロック
    レイヤー正規化・Attention・FeedFoward・Dropoutで構成される
    '''
    def __init__(self, d_model, dropout=0.1):
        super().__init__()

        # レイヤー正規化
        # https://pytorch.org/docs/stable/nn.html?highlight=layernorm
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)

        # Attention層
        self.attn = Attention(d_model)

        # Attentionのあとの全結合層2つ
        self.ff = FeedForward(d_model)

        # Dropout
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        '''
        順伝播
        '''
        
        # レイヤー正規化
        x_normlized = self.norm_1(x)

        # Attentionの計算
        output, normlized_weights = self.attn(
            x_normlized, x_normlized, x_normlized, mask)
        
        # Dropoutを通したものを加える
        x2 = x + self.dropout_1(output)

        # レイヤー正規化
        x_normlized2 = self.norm_2(x2)
        
        # 全結合層とDropoutを通したものを加える
        output = x2 + self.dropout_2(self.ff(x_normlized2))

        return output, normlized_weights

### 動作確認

In [10]:
# モデル構築
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)

# maskの作成
x = batch.Text[0]
input_pad = 1  # 単語IDにおいて、'<pad>': 1 であるため
input_mask = (x != input_pad)
print(input_mask[0])

# 入出力
x1 = net1(x)  # 単語をベクトルに変換
x2 = net2(x1)  # 位置エンコーディングを加算
x3, normlized_weights = net3(x2, input_mask)  # Self-Attentionで特徴量を変換

print("入力のテンソルサイズ：", x2.shape)
print("出力のテンソルサイズ：", x3.shape)
print("Attentionのサイズ：", normlized_weights.shape)

tensor([ True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True, False, False, 