<a href="https://colab.research.google.com/github/ktldud/AI-1-ktldud/blob/main/Chapter2_1_%EA%B3%BC%EC%A0%9CA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer 실습

이번 실습에서는 감정 분석 task에 RNN 대신 Transformer를 구현하여 적용해 볼 것입니다.
Library import나 dataloader 생성은 RNN 실습 때와 똑같기 때문에 설명은 넘어가도록 하겠습니다.

In [74]:
!pip install datasets



In [75]:
pip install sacremoses




In [76]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


In [100]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
    def __init__(self, input_dim, d_model):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model

        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)
        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        q, k, v = self.wq(x), self.wk(x), self.wv(x)
        score = torch.matmul(
            q, k.transpose(-1, -2)
        )  # (B, S, D) * (B, D, S) = (B, S, S)
        score = score / sqrt(self.d_model)

        if mask is not None:
            score = score + (mask * -1e9)

        score = self.softmax(score)
        result = torch.matmul(score, v)
        result = self.dense(result)

        return result

In [110]:
class MultiHeadAttention(nn.Module):

    def __init__(
        self,
        input_dim,
        d_model,
        n_heads,
    ):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.n_heads = n_heads

        assert (
            d_model % n_heads == 0
        ), f"d_model ({d_model}) must be a multiple of n_heads ({n_heads})"

        self.d_prime = self.d_model // self.n_heads

        self.wqkvs = nn.ModuleList(
            [
                nn.ModuleDict(
                    {
                        "wq": nn.Linear(input_dim, self.d_prime),
                        "wk": nn.Linear(input_dim, self.d_prime),
                        "wv": nn.Linear(input_dim, self.d_prime),
                    }
                )
                for _ in range(self.n_heads)
            ]
        )

        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        results = []
        for wqkv in self.wqkvs:
            q, k, v = wqkv["wq"](x), wqkv["wk"](x), wqkv["wv"](x)
            score = torch.matmul(
                q,
                k.transpose(-1, -2),
            )  # (B, S, D') * (B, D', S) = (B, S, S)
            score = score / sqrt(self.d_prime)

            if mask is not None:
                score = score + (mask * -1e9)

            score = self.softmax(score)
            result = torch.matmul(
                score,  # (B, S, S)
                v,  # (B, S, D')
            )  # (B, S, D')
            results.append(result)
        results = torch.cat(results, dim=-1)  # (B, S, D' * n_heads)
        output = self.dense(results)
        return output

In [111]:
class TransformerLayer(nn.Module):

    def __init__(self, input_dim, d_model, dff, n_heads, dropout_prob=0.3):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff

        self.mha = MultiHeadAttention(input_dim, d_model, n_heads)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff), nn.ReLU(), nn.Linear(dff, d_model)
        )
        self.dropout = nn.Dropout(dropout_prob)
        self.layer_norm_mha = nn.LayerNorm(normalized_shape=d_model)
        self.layer_norm_ffn = nn.LayerNorm(normalized_shape=d_model)

    def forward(self, x, mask):
        x1 = self.mha(x, mask)
        x1 = self.dropout(x1)
        x1 = self.layer_norm_mha(x + x1)

        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.layer_norm_ffn(x1 + x2)

        return x2


## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

In [112]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


In [113]:
class TextClassifier(nn.Module):

    def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
        super().__init__()

        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(
            positional_encoding(max_len, d_model), requires_grad=False
        )
        self.layers = nn.ModuleList(
            [TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)]
        )
        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        mask = x == tokenizer.pad_token_id
        mask = mask[:, None, :]
        seq_len = x.shape[1]

        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]

        for layer in self.layers:
            x = layer(x, mask)

        x = x[:, 0]
        x = self.classification(x)

        return x


model = TextClassifier(
    len(tokenizer),
    n_layers=5,
    dff=32,
    d_model=32,
    n_heads=4,
)

## 학습

학습하는 코드는 기존 실습들과 동일하기 때문에 마지막 결과만 살펴보도록 하겠습니다.

In [114]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [115]:
def accuracy(model, dataloader):
    cnt = 0
    acc = 0

    for data in dataloader:
        inputs, labels = data
        inputs, labels = inputs.to('cuda'), labels.to('cuda')

        preds = model(inputs)
        preds = (preds > 0).long()[..., 0]  # 로짓일 경우
        # preds = (torch.sigmoid(preds) > 0.5).long()[..., 0]  # 확률 출력일 경우

        cnt += labels.shape[0]
        acc += (labels == preds).sum().item()

    return acc / cnt


In [116]:
n_epochs = 50

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 216.03940320014954
Epoch   1 | Train Loss: 148.91142337024212
Epoch   2 | Train Loss: 120.28375335037708
Epoch   3 | Train Loss: 98.30352552235126
Epoch   4 | Train Loss: 77.29225611686707
Epoch   5 | Train Loss: 59.42878085374832
Epoch   6 | Train Loss: 45.21806052606553
Epoch   7 | Train Loss: 35.76661195605993
Epoch   8 | Train Loss: 30.827936668880284
Epoch   9 | Train Loss: 24.556845027022064
Epoch  10 | Train Loss: 21.593633562792093
Epoch  11 | Train Loss: 19.947684588842094
Epoch  12 | Train Loss: 18.860585012938827
Epoch  13 | Train Loss: 15.965383396018296
Epoch  14 | Train Loss: 17.193625897634774
Epoch  15 | Train Loss: 15.497269954532385
Epoch  16 | Train Loss: 15.233356156386435
Epoch  17 | Train Loss: 14.603013001382351
Epoch  18 | Train Loss: 13.146664925618097
Epoch  19 | Train Loss: 14.155213425634429
Epoch  20 | Train Loss: 13.048466647276655
Epoch  21 | Train Loss: 11.882441127905622
Epoch  22 | Train Loss: 12.47113403188996
Epoch  23 | Train

학습이 안정적으로 진행되며 RNN보다 빨리 수렴하는 것을 확인할 수 있습니다.
하지만 test 정확도가 RNN보다 낮은 것을 보았을 때, overfitting에 취약하다는 것을 알 수 있습니다.