<a href="https://colab.research.google.com/github/kato1329/CATech/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
import torch
import numpy as np
from torch import nn


class ScaleDotProductAttention(nn.Module):
  def __init__(self,d_k: int) -> None:
    super().__init__()
    self.d_k = d_k
  def forward(
      self,
      q:torch.Tensor,
      k:torch.Tensor,
      v:torch.Tensor,
      mask:torch.Tensor = None,
  ) -> torch.Tensor:
       ###Attentionを計算した後に値を正規化するために√dで割る。
       scalar = np.sqrt(self.d_k)
       ###Attentionの重みの宣言
       ###torch.transpose(input,dim0,dim1)→Tensor
       """
       transposeは入力した行列のdim0をdim1と転置する。
       例えば
       tensor = torch.Tensor([

        [[1 ,2 ,3 ],
        [4 ,5 ,6 ]],

        [[7 ,8 ,9 ],
        [10,11,12]],

        [[13,14,15],
        [16,17,18]]

        ])
        のサイズは(3,2,3)
        この時torch.transpose(tensor,1,2)を行うと
        2と3が入れ替わり、サイズは(3,3,2)となる。
        従って得られるテンソルは
        tensor([
          [[ 1.,  4.],
          [ 2.,  5.],
          [ 3.,  6.]],

          [[ 7., 10.],
          [ 8., 11.],
          [ 9., 12.]],

          [[13., 16.],
          [14., 17.],
          [15., 18.]]
          ])
          となる。
       """
       ###attentionの計算
       attention_weight = torch.matmul(q,torch.transpose(k,1,2))/scalar
       ###ここでattention_weightの次元はqの1次元目×kの1次元目となる。
       if mask is not None:
        if mask.dim() != attention_weight.dim():
          raise ValueError(
              "mask.dim != attention_weight_dim,mask.dim = {},attention_dim = {}".format(
                  mask.dim(),attention_weight.dim()
              )
          )
          """
          masked_fill_関数はboolで構成されたmask行列を受け取り、第一引数とする。受け取った
          mask行列は隠したいところにtrueがおいてある行列であり、attention_weightと同じ形を
          している。
          第二引数にはmask行列のTrueがあるところに当てる数字が入る。torch.finfo.maxは引数の
          型で表現することのできる最大の値を返し、それをマイナスにすることで-無限大を表現している
          """
          attention_weight = attention_weight.data.masked_fill_(
              mask,-torch.finfo(torch.float).max
          )
       attention_weight = nn.functional.softmax(attention_weight,dim=2)
       ###出力の次元はbatch_size×qの1次元目×vの2次元目となる。
       return torch.matmul(attention_weight,v)


In [24]:
k = torch.Tensor([[
    [1,-2,3,-4],
    [-8,7,6,-5],
    [10,9,12,11]
]])
k.shape

torch.Size([1, 3, 4])

In [27]:
q = torch.Tensor([
    [1,-2,3,-4],
    [-8,7,6,-5],
    [10,9,12,11],
    [1,-2,3,-4],
    [-8,7,6,-5],
    [10,9,12,11]
])
###size(1,4)
k = torch.Tensor([[
    [1,-2,3,-4],
    [-8,7,6,-5],
    [10,9,12,11]
]
])
###size(1,3,4)
v = k
sample_scaler = ScaleDotProductAttention(4)
print(sample_scaler.forward(q,k,v))

tensor([[[ 0.9918, -1.9918,  3.0027, -4.0009],
         [-8.0000,  7.0000,  6.0000, -5.0000],
         [10.0000,  9.0000, 12.0000, 11.0000],
         [ 0.9918, -1.9918,  3.0027, -4.0009],
         [-8.0000,  7.0000,  6.0000, -5.0000],
         [10.0000,  9.0000, 12.0000, 11.0000]]])


In [16]:
class MultiHeadAttention(nn.Module):
  def __init__(self,d_model:int,h:int) -> None:
    super().__init__()
    self.d_model = d_model
    self.h = h
    self.d_k = d_model//h
    self.d_v = d_model//h

    self.W_k = nn.Parameter(
        ###h個のd_model×d_k行列を作り、パラメータとする。
        torch.Tensor(h,d_model,self.d_k)
    )

    self.W_q = nn.Parameter(
        torch.Tensor(h,d_model,self.d_k)
    )

    self.W_v = nn.Parameter(
        torch.Tensor(h,d_model,self.d_v)
    )

    self.scaled_dot_product_attention = ScaleDotProductAttention(self.d_k)

    self.linear = nn.Linear(h*self.d_v,d_model)
  def forward(
      self,
      q:torch.Tensor,
      k:torch.Tensor,
      v:torch.Tensor,
      mask_3d:torch.Tensor = None,
  ) -> torch.Tensor:
     ###q.size(0)はTensorの行を表している。行→系列データの数よりbatch_sizeと一致する。
     ###q.size(1)はTensorの列を表している。列→系列データの大きさよりseq_lenと一致する。
     ###q.size(2)は系列一つのベクトルの長さを表している。従って単語の分散表現ベクトルに一致する。
     ###つまり元のqのサイズは[batch_size,seq_len,k(=d_model)]である。
     batch_size,seq_len = q.size(0),q.size(1)
     """
     tensor.repeatは指定された次元に沿って行列を複製する。今回の場合予め作られたqの行列
     の形を変えずにheadの数だけ複製し、新しい次元とする。
     k,vも同様である。
     """
     q = q.repeat(self.h,1,1,1)
     k = k.repeat(self.h,1,1,1)
     v = v.repeat(self.h,1,1,1)
     ##i×j行列に長さkのベクトルが押し込まれている。×head数。
     ##k×l行列のパラメータがある×head数
     """
     二つの行列を式に従って掛け算するとj,k行列とk,l行列の掛け算が行われ、hijl行列となる。
     従って掛け算によってk次元のベクトルの要素それぞれにパラメータの列の要素がかけられ、
     足されていく。
     下の操作によってQ,K,Vにはそれぞれ行列がかけられることになり、線形層の表現を獲得したといえる。
     """
     q = torch.einsum(
        ###k→d_model,l→d_kである。
        "hijk,hkl->hijl",(q,self.W_q)
     )
     k = torch.einsum(
        "hijk,hkl->hijl",(k,self.W_k)
     )
     v = torch.einsum(
        "hijk,hkl->hijl",(v,self.W_v)
     )
     """
     qのサイズは[h,batch_size,seq_len,d_k]であり、
     従って下の操作で次元を調整しても問題は生じない
     """
     q = q.view(self.h*batch_size,seq_len,self.d_k)
     k = k.view(self.h*batch_size,seq_len,self.d_k)
     v = v.view(self.h*batch_size,seq_len,self.d_v)

     if mask_3d is not None:
      mask_3d = mask_3d.repeat(self.h,1,1)

     ###内積注意の計算
     attention_output = self.scaled_dot_product_attention(
        q,k,v,mask_3d
     )
     ###線形層を通してd_model次元の出力を得る。
     output = self.linear(attention_output)
     ###outputはd_model次元のTensorとして出力される。
     return output

In [None]:
import numpy as np
import torch
from torch import nn


class AddPositionalEncoding(nn.Module):
    def __init__(
        self, d_model: int, max_len: int, device: torch.device = torch.device("cpu")
    ) -> None:
        super().__init__()
        self.d_model = d_model
        self.max_len = max_len
        positional_encoding_weight: torch.Tensor = self._initialize_weight().to(device)
        self.register_buffer("positional_encoding_weight", positional_encoding_weight)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        seq_len = x.size(1)
        return x + self.positional_encoding_weight[:seq_len, :].unsqueeze(0)

    def _get_positional_encoding(self, pos: int, i: int) -> float:
        w = pos / (10000 ** (((2 * i) // 2) / self.d_model))
        if i % 2 == 0:
            return np.sin(w)
        else:
            return np.cos(w)

    def _initialize_weight(self) -> torch.Tensor:
        positional_encoding_weight = [
            [self._get_positional_encoding(pos, i) for i in range(1, self.d_model + 1)]
            for pos in range(1, self.max_len + 1)
        ]
        return torch.tensor(positional_encoding_weight).float()


In [None]:
class FFN(nn.Module):
  def __init__(self,d_model:int,d_ff:int) -> None:
    super().__init__()
    self.linear1 = nn.Linear(d_model,d_ff)
    self.linear2 = nn.Linear(d_ff,d_model)

  def forward(self,x:torch,Tensor) -> torch.Tensor:
    return self.linear2(nn.functional.relu(self.linear1(x)))

In [None]:
import torch
from torch import nn

class TransformerEncoderLayer(nn.Module):
  def __init__(
      self,
      d_model:int,###multiheadattentionの次元数
      d_ff:int,###FFNの出力の次元数
      heads_num:int,###multiheadattentionのhead数
      dropout_rate:float,###dropout層のdropout確率
      layer_norm_eps:float,###LayerNorm層におけるepsの値。デフォルトは1e-05
  ) -> None:
    super().__init__()

    self.multi_head_attention = MultiHeadAttention(d_model,heads_num)
    self.dropout_self_attention = nn.Dropout(dropout_rate)
    self.layer_norm_self_attention = nn.LayerNorm(d_model,eps=layer_norm_eps)

    self.ffn = FFN(d_model,d_ff)
    self.dropout_ffn = nn.Dropout(dropout_rate)
    self.layer_norm_ffn = nn.LayerNorm(d_model,eps=layer_norm_eps)

  def forward(self,x:torch.Tensor,mask:torch.Tensor = None) -> torch.Tensor:
      x = self.layer_norm_self_attention(self.__self_attention_block(x,mask)+x)
      x = self.layer_norm_ffn(self.__feed_forward_block(x)+x)
      return x

  def __self_attention_block(self,x:torch.Tensor,mask:torch.Tensor) -> torch.Tensor:
    x = self.multi_head_attention(x,x,x,mask)
    return self.dropout_self_attention(x)

  def __feed_forward_block(self,x:torch.Tensor) -> torch.Tensor:
    return self.dropout_ffn(self.ffn(x))

class TransformerEncoder(nn.Module):
  def __init__(
      self,
      vocab_size:int,
      max_len:int,
      pad_idx:int,
      d_model:int,
      N:int,
      d_ff:int,
      heads_num:int,
      dropout_rate:float,
      layer_norm_eps:float,
      device:torch.device=torch.device("cpu")
  ) -> None:
    super().__init__()
    self.embedding = nn.Embedding(vocab_size,d_model,pad_idx)
    self.positional_encoding = AddPositionalEncoding(d_model,max_len,device)
    self.encoder_layers = nn.ModuleList(
        [
            TransformerEncoderLayer(
                d_model,d_ff,heads_num,dropout_rate,layer_norm_eps
            )
            for _ in range(N)
        ]
    )

  def forward(self,x:torch.Tensor,mask:torch.Tensor = None) -> torch.Tensor:
      x = self.embedding(x)
      x = self.positional_encoding(x)
      for encoder_layer in self.encoder_layers:
        x = encoder_layer(x,mask)
      return x


In [None]:
class TransformerDecoderLayer(nn.Module):
  def __init__(
      self,
      d_model:int,
      d_ff:int,
      heads_num:int,
      dropout_rate:float,
      layer_norm_eps:float,
  ):
    super().__init__()
    self.self_attention = MultiHeadAttention(d_model,heads_num)
    self.dropout_self_attention = nn.Dropout(dropout_rate)
    self.layer_norm_self_attention = LayerNorm(d_model,eps=layer_norm_eps)

    self.src_tgt_attention = MultiHeadAttention(d_model,heads_num)
    self.dropout_src_tgt_attention = nn.Dropout(dropout_rate)
    self.layer_norm_src_tgt_attention = LayerNorm(d_model,eps=layer_norm_eps)

    self.ffn = FFN(d_model,d_ff)
    self.dropout_ffn = nn.Dropout(dropout_rate)
    self.layer_norm_ffn = LayerNorm(d_model,eps=layer_norm_eps)

  def forward(
      self,
      tgt:torch.Tensor,
      src:torch.Tensor,
      mask_src_tgt:torch.Tensor,
      mask_self:torch.Tensor,
  ) -> torch.Tensor:
    tgt = self.layer_norm_self_attention(
        tgt + self.__self_attention_block(tgt,mask_self)
    )

    x = self.layer_norm_src_tgt_attention(
        tgt + self.__src_tgt_attention_block(src,tgt,mask_src_tgt)
    )

    x = self.layer_norm_ffn(x+self.__feed_forward_block(x))

    return x
  def __src_tgt_attention_block(
      self,src:torch.Tensor,tgt:torch.Tensor,mask:torch.Tensor
  ) -> torch.Tensor:
    return self.dropout_src_tgt_attention(
        self.src_tgt_attention(tgt,src,src,mask)
    )
  def __self_attention_block(
      self,x:torch.Tensor,mask:torch.Tensor
  ):
    return self.dropout_self_attention(self.self_attention(x,x,x,mask))
  def __feed_forward_block(self,x:torch.Tensor) -> torch.Tensor:
    return self.dropout_ffn(self.ffn(x))


In [None]:
class TransformerDecoder(nn.Module):
  def __init__(
      self,
      tgt_vocab_size:int,
      max_len:int,
      pad_idx:int,
      d_model:int,
      N:int,
      d_ff:int,
      heads_num:int,
      dropout_rate:float,
      layer_norm_eps:float,
      device:torch.device = torch.device('cpu')
  ) -> None:
    super().__init__()
    self.embedding = Embedding(tgt_vocab_size,d_model,pad_idx)
    self.positional_encoding = AddPositionalEncoding(d_model,max_len,device)
    self.decoder_layers = nn.ModuleList(
        [
            TransformerDecoderLayer(
                d_model,d_ff,heads_num,dropout_rate,layer_norm_eps
            )
            for _ in range(N)
        ]
    )
  def forward(
      self,
      tgt:torch.Tensor,
      src:torch.Tensor,
      mask_src_tgt:torch.Tensor,
      mask_self:torch.Tensor
  ) -> torch.Tensor:
    tgt = self.embedding(tgt)
    tgt = self.positional_encoding(tgt)
    for decoder_layer in self.decoder_layers:
      tgt = decoder_layer(
          tgt,
          src,
          mask_src_tgt,
          mask_self
      )
    return tgt

In [None]:
class Transformer(nn.Module):
  def __init__(
      self,
      src_vocab_size:int,
      tgt_vocab_size:int,
      max_len:int,
      d_model:int=512,
      heads_num:int=8,
      d_ff:int=2048,
      N:int=6,
      dropout_rate:float=0.1,
      layer_norm_eps:float=1e-5,
      pad_idx:int=0,
      device:torch.device = torch.device('cpu')
  ):
    super().__init__()
    self.src_vocab_size = src_vocab_size
    self.tgt_vocab_size = tgt_vocab_size
    self.d_model = d_model
    self.max_len = max_len
    self.heads_num = heads_num
    self.d_ff = d_ff
    self.N = N
    self.dropout_rate = dropout_rate
    self.layer_norm_eps = layer_norm_eps
    self.pad_idx = pad_idx
    self.device = device

    self.encoder = TransformerEncoder(
        src_vocab_size,
        max_len,
        pad_idx,
        d_model,
        N,
        d_ff,
        heads_num,
        dropout_rate,
        layer_norm_eps,
        device
    )

    self.decoder = TransformerDecoder(
        tgt_vocab_size,
        max_len,
        pad_idx,
        d_model,
        N,
        d_ff,
        heads_num,
        dropout_rate,
        layer_norm_eps,
        device
    )

    self.linear = nn.Linear(d_model,tgt_vocab_size)

    def forward(
        self,
        src:torch.Tensor,
        tgt:torch.Tensor
    ) -> torch.Tensor:
      pad_mask_src = self._pad_mask(src)
      src = self.encoder(src,pad_mask_src)

      mask_self_attn = torch.logical_or(
          self._subsequent_mask(tgt),self._pad_mask(tgt)
      )
      dec_output = self.decoder(tgt,src,pad_mask_src,mask_self_attn)

      return self.linear(dec_output)

    def _pad_mask(self,x:torch.Tensor) -> torch.Tensor:
      seq_len = x.size(1)
      ###これattention_maskつくるのめっちゃ楽じゃね
      mask = x.eq(self.pad_idx)
      mask = mask.unsqueeze(1)
      mask = mask.repeat(1,seq_len,1)
      return mask.to(self.device)

    def _subsequent_mask(self,x:torch.Tensor) -> torch.Tensor:
      batch_size = x.size(0)
      max_len = x.size(1)
      return (
          torch.trill(torch.ones(batch_size,max_len,max_Len)).eq(0.to(self.device))
      )