In [1]:
import torch

if torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

train_model = False  # 실제 훈련을 할 시 True 로 변경
device

'mps'

In [2]:
%%capture
# capture는 주렁주렁 출력 보여주지 않게끔
# Font 설치
!brew tap homebrew/cask-fonts
!brew install --cask font-nanum
!fc-cache -fv
!rm -rf ~/Library/Caches/matplotlib

In [3]:
import matplotlib.pyplot as plt

plt.rc("font", family="NanumBarunGothic")
plt.rcParams["axes.unicode_minus"] = False

## 모델 구현

* encoder, decoder 의 self attention 을 수행하는 모듈

### einops
* 장점 : 텐서 변환을 매우 쉽게 이해하고 변환
* 단점 : 디버깅 힘들고 속도 느리다

In [4]:
from einops import rearrange

# 차원 재배치
x = torch.randn(2, 3, 4)
y = rearrange(x, "a b c -> c a b")  # 차원의 이름을 임의로 지정할 수 있다

y.shape

torch.Size([4, 2, 3])

In [5]:
# 차원 나누기
x = torch.randn(6, 4)
y = rearrange(
    x, "(a b) c -> a b c", a=2, b=3
)  # x 의 첫 번쨰 차원이 a 와 b 로 이루어져 있다

y.shape

torch.Size([2, 3, 4])

### Multi Head Attention (MHA) 구현

In [7]:
from torch import nn


class MHA(nn.Module):

    def __init__(self, d_model=512, n_heads=8):
        """
        d_model : dimension of embedding vector
        n_heads : num of heads in MHA
        """
        super().__init__()

        self.n_heads = n_heads

        # Q, K, V 벡터를 위한 linear layer
        self.fc_q = nn.Linear(in_features=d_model, out_features=d_model)
        self.fc_k = nn.Linear(in_features=d_model, out_features=d_model)
        self.fc_v = nn.Linear(in_features=d_model, out_features=d_model)

        # 최종 출력을 위한 linear layer
        self.fc_o = nn.Linear(in_features=d_model, out_features=d_model)

        # scaling for attention score
        self.scale = torch.sqrt(
            torch.tensor(d_model / n_heads)
        )  # GPU 에서의 연산을 위해 tensor 로 변환

    def forward(self, Q, K, V, mask=None):
        """
        Q, K, V : embedding vector of each word, (batch_size, max_len, dim)
        mask : mask for attention score (optional) => init as None
        """

        # linear transformation on embedding vector
        Q = self.fc_q(Q)
        K = self.fc_k(K)
        V = self.fc_v(V)

        # MHA 를 위해 embedding dim 을 n_head 개로 분할
        # ex. (32, 128, 512) -> (32, 8, 128, 64)
        Q = rearrange(Q, "N t (h dk) -> N h t dk", h=self.n_heads)
        K = rearrange(K, "N t (h dk) -> N h t dk", h=self.n_heads)
        V = rearrange(V, "N t (h dk) -> N h t dk", h=self.n_heads)

        # attention score before softmax
        attention_score = (
            Q @ K.transpose(-2, 1) / self.scale
        )  # (N, h, t, d_k) @ (N, h, d_k, t) -> (N, h, t(Query 길이), t(Key 길이))

        # masking on padding
        if mask is not None:
            attention_score[mask] = -1e10

        # softmax on Key
        energy = torch.softmax(attention_score, dim=-1)

        attention = energy @ V  # (N, h, t, t) @ (N, h, t, dk) -> (N, h, t, dk)

        # head 차원을 연결해서 원래의 차원으로 되돌리기
        x = rearrange(attention, "N h t dk -> N t (h dk)")  # (N, h, t, dk) -> (N, t, D)

        # 최종 출력값에 대해 linear transformation (모든 head 의 분석 결과를 종합)
        x = self.fc_o(x)

        return x, energy

### Feed Forward Network (FNN)
* encoder, decoder 의 MHA 결과를 하나로 합쳐준다

In [8]:
class FeedForward(nn.Module):
    def __init__(
        self, d_model=512, d_ff=2048, drop_p=0.1
    ):  # dropout : 차원 확장에 의한 overfitting 방지
        super().__init__()
        self.linear = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(drop_p),
            nn.Linear(d_ff, d_model),
        )

    def forward(self, mha_output):
        out = self.linear(mha_output)
        return out

## Encoder

In [10]:
# MHA - FFN 연결 과정, skip connection, LN
class EncoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, n_heads, drop_p):
        """
        d_model : 임베딩 벡터 차원
        d_ff : feed forward 은닉층 차원
        n_heads : MHA head 갯수
        drop_p : dropout 비율
        """
        super().__init__()

        # MHA (Self Attention)
        self.self_atten = MHA(d_model=d_model, n_heads=n_heads)

        # MHA 에 대한 layer normalization
        self.self_atten_LN = nn.LayerNorm(normalized_shape=d_model)

        # feed forward network
        self.FF = FeedForward(d_model=d_model, d_ff=d_ff, drop_p=drop_p)

        # feed forward 출력에 대한 LN
        self.FF_LN = nn.LayerNorm(normalized_shape=d_model)

        self.dropout = nn.Dropout(drop_p)

    def forward(self, x, enc_mask):
        """
        x : input tensor (batch_size, seq_len, d_model)
        enc_mask : input mask (batch_size, 1, seq_len)

        return : encoder output, energy
        """

        # MHA block
        residual, atten_enc = self.self_atten(Q=x, K=x, V=x, mask=enc_mask)
        residual = self.dropout(residual)

        # Skip Connection & LN
        encoder_self_attention_output = self.self_atten_LN(x + residual)

        # FFN block
        residual = self.FF(encoder_self_attention_output)
        residual = self.dropout(residual)

        # Skip Connection & LN
        encoder_ffn_output = self.FF_LN(encoder_self_attention_output + residual)

        return encoder_ffn_output, atten_enc

In [13]:
class Encoder(nn.Module):

    def __init__(
        self, input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p
    ):
        """
        input_embedding : 입력 임베딩 레이어 (nn.Embedding)
        max_len : input sequence 의 최대 길이 (int)
        n_layers : encoder layer 의 개수
        d_model : embedding vector 의 차원
        d_ff : feed forward의 은닉층의 차원
        n_heads : MHA의 head 개수
        drop_p : dropout 비율
        """

        # d_model 의 제곱근 값으로 scale -> embedding vector 크기 조정
        self.scale = torch.sqrt(torch.tensor(d_model))

        # input embedding layer
        self.input_embedding = input_embedding

        # 위치 embedding layer : 위치 정보를 학습하기 위한 layer
        self.pos_embedding = nn.Embedding(num_embeddings=max_len, embedding_dim=d_model)

        self.dropout = nn.Dropout(drop_p)

        # 여러 개의 encoder layer 를 쌓기 위해 ModuleList 활용
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model, d_ff, n_heads, drop_p) for _ in range(n_layers)]
        )

        self.device = device

    def forward(self, src, mask, atten_map_save=False):
        """
        src : input sequence (batch_size, seq_len)
        mask : mask for padding (batch_size, 1, seq_len)
        atten_map_save : attention map 을 저장할지 여부
        """
        # 위치 index tensor 생성 : 각 batch 에서 sequence 길이만큼 위치 index 반복
        pos = torch.arange(src.shape[1]).repeat(src.shape[0], 1).to(self.device)

        # input embedding 과 위치 embedding 을 합해 input tensor 생성
        x_embedding = self.input_embedding(src) + self.pos_embedding(pos)

        x_embedding = self.dropout(x_embedding)

        # 첫 입력
        encoder_output = x_embedding

        # energy 를 저장할 텐서
        atten_encs = torch.tensor([]).to(self.device)

        # 각 encoder layer 를 순차적으로 통과
        for layer in self.layers:
            encoder_output, atten_enc = layer(encoder_output, mask)

            if atten_map_save:
                atten_encs = torch.cat([atten_encs, atten_enc[0].unsqueeze()])

        return encoder_output, atten_encs