# Aladdin Persson의 scratch부터 구현하는 Transformer를 보면서 그대로 작성한 코드이다.



장점:
- 논문 모델 figure의 어느부분에 해당하는지 설명하면서 코드 구현을 진행하여 이해가 쉬웁. 직관적으로 코드를 논문과 비교하며 보기가 수월했다고 생각함.

단점:
- 논문을 그대로 구현하지 않음. 예를 들어, Positional encoding의 경우는 sin, cos 기반의 PE를 적용한게 아니라 그냥 Embedding 레이어 사용함.
- 실제 데이터로 모델을 돌리기까진 안갔음.
- torch.einsum 과 같은, 바로 와닿지 않는 함수가 있음 (하지만 이는 코딩 공부겸 장점이 될 수도 있다.)


일단 시간이 넉넉하면 구현해보는 것도 괜찮다고 생각함.

개인적으로, `논문(이론)으로 이해 - 적당한 구현 - 구체적인 구현` 에서 중간단계인 적당한 구현 정도로 생각한다.

In [None]:
import torch
import torch.nn as nn

## Self Attention

In [None]:
class SelfAttention(nn.Module):
  def __init__(self, embed_size, heads):
    super(SelfAttention. self).__init__()
    self.embed_size = embed_size
    self.heads = heads
    self.head_dim = embed_size//heads

    assert (self.head_dim * heads == embed_size), "Embedsize needs to be div by heads"

    self.values = nn.Linear(self.head_dim, self.head_dim, bias = False)
    self.keys = nn.Linear(self.head_dim, self.head_dim, bias = False)
    self.queries = nn.Linear(self.head_dim, self.head_dim, bias = False)
    # 이 셋을 정의해놓고 사용은 안함. 원래 input word하나에 대해서 input -> value, input -> key, input -> query 이런식으로 변환 하는건데, 여기선 애초에 query, key, value를 처음부터 받는다고 가정하고 전개함.


    self.fc_out = nn.Linear(heads*self.head_dim, embed_size)
  
  def forward(self, values, keys, query, mask):

    ############################## 영상에 없는 것 추가한 부분 ***

    N = query.shape[0] # training example의 개수, 배치 사이즈로 보면 될듯. https://youtu.be/U0s0f995w14?t=940
    value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

    # 이제 multi head 각각의 head들로 values, keys, queries를 나눈다.
    values = values.reshape(N, value_len, self.heads, self.head_dim) # 원래는 나눠지기 전의 상태로 입력을 받는다 가정하는듯 하다.
    keys = keys.reshape(N, key_len, self.heads, self.head_dim)
    queries = query.reshape(N, query_len, self.heads, self.head_dim)
    
    values = self.values(values)
    keys = self.keys(keys)
    query = self.query(query)

    energy = torch.einsum("nqhd,nkhd -> nhqk", [queries, keys]) # 각각 query를 keys와 곱해서 target word인 query를 다른 word들인 keys에 얼마나 attention 시킬건지. dot product attention을 여기선 사용함.
    # einsum은 batch를 고려한 matrix multiplication을 해야하는 상황에서 복잡하게 안하고 einsum으로 간단히 처리하는 것이다.
    # 의미는 다음과 같다.
    # queries의 shape는 (N, query_len, heads, heads_dim), einsum에선 이 각각을 n, q, h, d로 임의로 표현하여 nqhd로 작성한다.
    # keys의 shape는 (N, key_len, heads, heads_dim), einsum에선 이 각각을 n, k, h, d로 임의로 표현하여 nqhd로 작성한다.
    # -> nhqk는 저 shape를 nhqk와 같은 shape로 연산하고 싶다는 의미이다.
      # 연산의 의미는 N은 배치니까 없다고 치고 (query_len, heads, heads_dim). (key_len, heads, heads_dim)만 고려하자.
      # 여기서, multiplication이 (heads, heads_dim) (heads,heads_dim)끼리 된다면 각각의 query, key에 대해서 될 것이다.
      # (첫번째 쿼리(word)에 대한, head들의 query, 각 head의 dim), (첫번째 key(word)에 대한, head들의 key, 각 head의 dim)의 행렬곱이 될거다.
      


    if mask is not None: # 만약 mask 적용을 하면
      energy = energy.masked_fill(mask == 0, float("-1e20")) # 여기선 패딩을 0으로 표현했다. 즉, 패딩된 부분은 값을 모두 -infinit로 하여, 해당 부분에 대한 attention은 0으로 만든다.

    attention = torch.softmax(energy/(self.embed_size**(1/2)), dim=3) # (N,H,Q,K) shape이다 즉, K부분에 대한 softmax이고, 이는 하나의 query에 대한 모든 key에 대해서 softmax를 구하겠다는 의미이다.

    out = torch.einsum("nhql, nlhd -> nqhd", [attention, values]).reshape(
        N, query_len, self.heads*self.head_dim
    )

    out = self.fc_out(out)

    return out


class TransformerBlock(nn.Module):
  def __init__(self, embed_size, heads, dropout, forward_expansion):
    super(TransformerBlock, self).__init__()
    self.attention = SelfAttention(embed_size, heads)

    self.norm1 = nn.LayerNorm(embed_size)
    self.norm2 = nn.LayerNorm(embed_size)

    self.feed_forward = nn.Sequential(
        nn.Linear(embed_size, forward_expansion*embed_size), # Feed forward 파트보면, 중간 layer에서 노드가 더 많아지는 파트있음. 그거 의미하는거. 
        nn.ReLU(),
        nn.Linear(forward_expansion*embed_size, embed_size)
    )
    self.dropout= nn.Dropout(dropout)

  def forward(self, value, key, query, mask):
    attention = self.attention(value, key, query, mask)

    x = self.dropout(self.norm1(attention + query)) # 이 부분은 오류인듯하다, 원래 input받고 query, key, value로 변환하는 식으로 하고 이거자체는 그냥 onput을 줘야되는데 여기선 그렇게 안했다.
    forwards = self.feed_forward(x)
    out = self.dropout(self.norm2(forwards+x))
    return out

class Encoder(nn.Module):
  def __init__(
      self,
      src_vocab_size,
      embed_size,
      num_layers,
      heads,
      device,
      forward_expansion,
      dropout,
      max_length
  ):
    super(Encoder, self).__init__()
    self.embed_size = embed_size
    self.device = device
    self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
    self.position_embedding = nn.Embedding(max_length, embed_size) # 여기선 sin, cos을 사용 안했다.

    self.layers = nn.ModuleList(
        [
            TransformerBlock(
                embed_size,
                heads,
                dropout = dropout,
                forward_expansion = forward_expansion
            )
        for _ in range(num_layers)]
    )

    self.dropout = nn.Dropout(dropout)


    def forward(self, x, mask):
      N, seq_length = x.shape
      positions = torch.arange(0, seq_length).exapnd(N, seq_length).to(self.device)
      out = self.dropout(self.word_embedding(x) + self.position_embedding(positions))

      for layer in self.layers:
        out = layer(out, out, out, mask) # 이 코드에서 구현한 Transformer안에선 각각을 key query value로 변환하는 과정을 안거친다. -> 원래 거치게 하면 out out out 넣는게 맞다, 댓글보니까 query, key, value 변환 코드를 넣어야 하는듯

      return out

class DecoderBlock(nn.Module)::
  def __init__(self, embed_size, heads, forward_expansion, dropout, device):
    super(DecoderBlock,self).__init__()
    self.attention = SelfAttention(embed_size, heads)
    self.norm = nn.LayerNorm(embed_size)
    self.transformer_block = TransformerBlock(
        embed_size, heads, dropout, forward_expansion
    )
    self.dropout = nn.Dropout(dropout)

  def forwar(self, x, value, key, src_mask, trg_mask):  # src mask는 옵션이지만, trg mask는 반드시 decoder에서 가져야한다. src mask는 예제들 몇개 넣었을 때 모두 같은 길이 가지도록 패딩함. 그 후에 그 패딩된 애들에 대해서는 계산 딱히 안하도록 src mask 사용하는거 (https://youtu.be/U0s0f995w14?t=2438)
    attention = self.attention(x,x,x, trg_mask) # decoder에선 mask 필요함.
    query = self.dropout(self.norm(attention + x))
    out = self.transformer_block(value, key, query, src_mask)

    return out


class Decoder(nn.Module):
  def __init__(self,
               trg_vocal_size,
               embed_size,
               num_layers,
               heads,
               forward_expansion,
               dropout,
               device,
               max_length):
    super(Decoder,self).__init__() # 상속받고자 하는 상위 모듈의 init를 그대로 불러오는 역할을 한다.  
    self.device = device
    self.word_embedding = nn.Embedding(tag_vocab_size, embed_size)
    self.position_embedding = nn.Embeding(max_length, embed_size)

    self.layers = nn.ModuleList(
        [DecoderBlock(embed_size, heads, forward_expansion, dropout, device) for _ in range(num_layers)]
    )
    self.fc_out = nn.Linear(embed_size, trg_vocab_size)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x, enc_out, src_mask, trg_mask):
    N, seq_length = x.shape
    positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
    X = self.dropout(self.word_embedding(x) + self.position_embedding(x))

    for layer in self.layers:
      x = layer(x, enc_out, enc_out)
    
    out = self.fc_out(x)
    return out

class Transformer(nn.Module):
  def __init__(
      self,
      src_vocab_size,
      trg_vocab_size,
      src_pad_idx,
      trg_pad_idx,
      embed_size=256,
      num_layers=6,
      forward_expansion=4,
      heads=8,
      dropout=0,
      device="cuda",
      max_length=100      
  ):
    super(Transformer, self).__init__()
    self.encoder = Encoder(
        src_vocab_size,
        embed_size,
        num_layers,
        heads,
        device,
        forward_expansion,
        dropout,
        max_length
    )

    self.decoder = Decoder(
        trg_vocab_size,
        embed_size,
        num_layers,
        heads,
        forward_expansion,
        dropout,
        device,
        max_length
    )

    self.src_pad_idx = src_pad_idx
    self.trg_pad_idx = trg_pad_idx
    self.device = device

  def make_src_mask(self, src):
    src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
    # (N, 1, 1, src_len)
    return src_mask.to(self.device)
 
  def make_trg_mask(self,trg):
    N, trg_len = trg.shape
    trg_mask = torch.trill(torch.ones((trg_len, trg_len))).expand(
        N, 1, trg_len, trg_len
    )
    return trg_mask.to(self.device)

  def forward(self, src, trg):
    src_mask = self.make_src_mask(src)
    trg_mask = self.make_trg_mask(trg)
    enc_src = self.encoder(src, src_mask)
    out = self.decoder(trg, enc_src, src_mask, trg_mask)
    return out

