In [3]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
import matplotlib.pyplot as plt
import seaborn
seaborn.set_context(context="talk")
%matplotlib inline

Scaled Dot-product

In [1]:
#Decoder 부분에서는 masking 처리를 해야 하는 부분이 있기에, mask 부분 함께 구현
def scaled_dot_product_attention(self, q, k, v, mask=None):
  d_k = k.size()[-1] # key 벡터의 차원, scaling factor용
  k_transpose = torch.transpose(k, 3, 2)

  output = torch.matmul(q, k_transpose) # Attention score 계산
  output = output / math.sqrt(d_k) # scale
  if mask is not None: # mask 적용할지 말지
    output = output.masked_fill(mask.unsqueeze(1).unsqueeze(-1), 0)

# softmax로 확률화
  output = F.softmax(output, -1)
  output = torch.matmul(output, v) # Attention value 계산

  return output

## 1. Multi-Head Attention

q, k , v의 입력 모양은 보통 (B, S, D)
B: 배치 크기
S: 시퀀스 길이
D: dim_num, 임베딩 차원

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, dim_num=512, head_num=8):
        super().__init__()
        self.head_num = head_num
        self.dim_num = dim_num
        # Q, K, V, Output를 위한 Linear 레이어 4개 정의
        self.query_embed = nn.Linear(dim_num, dim_num)
        self.key_embed = nn.Linear(dim_num, dim_num)
        self.value_embed = nn.Linear(dim_num, dim_num)
        self.output_embed = nn.Linear(dim_num, dim_num)

    #Decoder 부분에서는 masking 처리를 해야 하는 부분이 있기에, mask 부분 함께 구현
    def scaled_dot_product_attention(self, q, k, v, mask=None):
      d_k = k.size()[-1] # key 벡터의 차원, scaling factor용
      k_transpose = torch.transpose(k, 3, 2)

      output = torch.matmul(q, k_transpose) # 내적으로 Attention score 계산
      output = output / math.sqrt(d_k) # scale
      if mask is not None: # mask 적용할지 말지
        output = output.masked_fill(mask.unsqueeze(1).unsqueeze(-1), 0)

    # softmax로 확률화
      output = F.softmax(output, -1)
      output = torch.matmul(output, v) # Attention value 계산

      return output

      def forward(self, q, k, v, mask=None):
        batch_size = q.size()[0]

        # 헤드 분할
        q = self.query_embed(q).view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)
        k = self.key_embed(k).view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)
        v = self.value_embed(v).view(batch_size, -1, self.head_num, self.d_k).transpose(1,2)
        # 어텐션 계산, 병렬적으로 계산
        attention_output = self.scaled_dot_product_attention(q, k, v, mask)
        # 결과 취합, transpose로 순서 맞추고, view로 물리적으로 이어붙
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.dim_num)

        output = self.output_embed(attention_output)

        return output



## 2. Residual Add & Layer Norm

In [5]:
class AddLayerNorm(nn.Module):
  def __init__(self):
    super().__init__()

  def layer_norm(self, input): #평균을 빼고 표준편차로 나누는 정규화
    mean = torch.mean(input, dim=-1, keepdim=True)
    std = torch.std(input, dim=-1, keepdim=True)
    output = (input-mean) / std
    return output

  def forward (self, input, residual): #잔차 학
    return residual + self.layer_norm(input)

## 3. Feed Forward

In [6]:
class FeedForward(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()
        #확장 레이어: 입력 차원을 4배로 늘림
        self.layer1 = nn.Linear(dim_num, dim_num * 4)
        #축소 레이어: 확장된 차원을 다시 원래대로 줄임
        self.layer2 = nn.Linear(dim_num * 4, dim_num)

    def forward(self, input):
        output = self.layer1(input) #확장
        output = self.layer2(F.relu(output)) # ReLU 함수 적용 뒤, 축

        return output

## 4. Encoder

In [7]:
class Encoder(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()
        # 문맥을 정리하는 multi-head attention
        self.multihead = MultiHeadAttention(dim_num=dim_num)
        # 결과를 정리하는 Add & Norm
        self.residual_layer1 = AddLayerNorm()
        # 정보를 심화 처리하는 FeedForward
        self.feed_forward = FeedForward(dim_num=dim_num)
        # 최종 결과를 정리하는 Add & Norm
        self.residual_layer2 = AddLayerNorm()

    def forward(self, q, k, v):
        multihead_output = self.multihead(q, k, v)
        residual1_output = self.residual_layer1(multihead_output, q)
        feedforward_output = self.feed_forward(residual1_output)
        output = self.residual_layer2(feedforward_output, residual1_output)

        return output

## 5. Decoder

In [8]:
class Decoder(nn.Module):
    def __init__(self, dim_num=512):
        super().__init__()

        # Decoder의 구
        self.masked_multihead = MultiHeadAttention(dim_num=dim_num)
        self.residual_layer1 = AddLayerNorm()
        self.multihead = MultiHeadAttention(dim_num=dim_num)
        self.residual_layer2 = AddLayerNorm()
        self.feed_forward = FeedForward(dim_num=dim_num)
        self.residual_layer3 = AddLayerNorm()

    def forward(self, o_q, o_k, o_v, encoder_output, mask):
        #decoder니까 masking해야함
        masked_multihead_output = self.masked_multihead(o_q, o_k, o_v, mask)
        residual1_output = self.residual_layer1(masked_multihead_output, o_q)
        # 질문은 decoder가 답은 encdoer를 통해 도출
        multihead_output = self.multihead(residual1_output, encoder_output, encoder_output)
        residual2_output = self.residual_layer2(multihead_output, residual1_output)

        feedforward_output = self.feed_forward(residual2_output)
        output = self.residual_layer3(feedforward_output, residual2_output)

        return output

## 6. Transformer

In [9]:
# 이전에 정의한 Encoder, Decoder 등의 클래스가 있다고 가정합니다.

class Transformer(nn.Module):
    def __init__(self, encoder_num=6, decoder_num=6, hidden_dim=512,
                 src_vocab_size=10000, tgt_vocab_size=10000,
                 max_seq_length=100, dropout_ratio=0.1):
        super().__init__()

        self.hidden_dim = hidden_dim

        # 어휘 사전을 입력으로 받음
        self.input_data_embed = nn.Embedding(src_vocab_size, self.hidden_dim)
        self.output_data_embed = nn.Embedding(tgt_vocab_size, self.hidden_dim)

        # 위치 인코딩은 한번만 생성
        self.pos_encoding = self.position_encoding(max_seq_length, self.hidden_dim)
        self.dropout = nn.Dropout(dropout_ratio)

        # 일반 리스트 말고 nn.ModuleList 사용해야
        self.Encoders = nn.ModuleList([Encoder(dim_num=hidden_dim) for _ in range(encoder_num)])
        self.Decoders = nn.ModuleList([Decoder(dim_num=hidden_dim) for _ in range(decoder_num)])

        # 출력 차원을 어휘 사전 크기로 설정
        self.last_linear_layer = nn.Linear(self.hidden_dim, tgt_vocab_size)

    def position_encoding(self, max_seq_length, hidden_dim):
        pe = torch.zeros(max_seq_length, hidden_dim) # 최대 문장 길이, 임베딩 차원으로 빈 행렬 만들기
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1) # 위치 번호를 담은 벡터
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim )) # 차원마다 다른 주기의 파동을 만듦

        pe[:, 0::2] = torch.sin(position * div_term) #짝수는 사인
        pe[:, 1::2] = torch.cos(position * div_term) #홀수는 코사인

        pe = pe.unsqueeze(0) # 배치 차원 추가
        # register_buffer: 학습되지는 않지만 모델의 state로 저장되는 텐서
        self.register_buffer('pe', pe)
        return pe

    def forward(self, src, tgt, src_padding_mask, tgt_look_ahead_mask):
        # 인코더 과정
        # 임베딩 + 위치 인코딩
        # 단어를 벡터로 변환 후 위치 정보 더해줌
        src_embed = self.dropout(self.input_data_embed(src) + self.pe[:, :src.size(1), :])

        # 여러 개의 인코더 층을 순서대로 통과
        encoder_output = src_embed
        for encoder in self.Encoders:
            encoder_output = encoder(encoder_output, encoder_output, encoder_output)

        # 디코더 과정
        # 타겟 문장을 임베딩하고 위치 정보를 더함
        tgt_embed = self.dropout(self.output_data_embed(tgt) + self.pe[:, :tgt.size(1), :])

        # 여러 개의 디코더 층을 순서대로 통과
        decoder_output = tgt_embed
        for decoder in self.Decoders:
            decoder_output = decoder(decoder_output, decoder_output, decoder_output,
                                     encoder_output, tgt_look_ahead_mask)

        #최종 출력 과정
        #최종 단어 예측
        output = self.last_linear_layer(decoder_output)

        # Softmax는 보통 CrossEntropyLoss에 포함되어 있어 생략 가능
        return output