<a href="https://colab.research.google.com/github/sewon-31/aiplus/blob/master/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformer 실습

이번 실습에서는 감정 분석 task에 RNN 대신 Transformer를 구현하여 적용해 볼 것입니다.
Library import나 dataloader 생성은 RNN 실습 때와 똑같기 때문에 설명은 넘어가도록 하겠습니다.

In [1]:
!pip install datasets sacremoses

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl.metadata (8.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 k

In [2]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


# ds = load_dataset("stanfordnlp/imdb")
train_ds = load_dataset("stanfordnlp/imdb", split="train")
test_ds = load_dataset("stanfordnlp/imdb", split="test")

tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')

def collate_fn_0(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids) # (64, 최대 400)
  labels = torch.LongTensor(labels) # (64,)

  return texts, labels

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
    texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))

  texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id) # (64, 최대 397)
  labels = torch.LongTensor(labels) # (64,)

  return texts, labels

train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

Downloading: "https://github.com/huggingface/pytorch-transformers/zipball/main" to /root/.cache/torch/hub/main.zip


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

### collate_fn (기본)
DataLoader가 배치를 구성하는 방식을 사용자 정의 <br/>
batch_size=4일 때, batch는 [샘플1, 샘플2, ..., 샘플4] 형식의 dictionary list
```
batch = [
    {"text": "This movie was amazing! The story was gripping.", "label": 1},
    {"text": "Terrible film. Worst acting I've seen.", "label": 0},
    {"text": "A decent movie with some flaws, but still enjoyable.", "label": 1},
    {"text": "Not my cup of tea. The pacing was too slow.", "label": 0}
]
```
collate_fn은 batch에서 text와 label을 각각 모아서 텐서로 변환

```
texts = [
    "This movie was amazing! The story was gripping.",
    "Terrible film. Worst acting I've seen.",
    "A decent movie with some flaws, but still enjoyable.",
    "Not my cup of tea. The pacing was too slow."
]

labels = [1, 0, 1, 0]
```
tokenizer
```
texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
```
- texts의 각 문장을 tokenize한 후
  - 짧은 문장은 0으로 padding
  - 긴 문장은 token 개수가 max_len=400개를 넘지 않도록 truncation
  - 즉, 모든 문장이 동일한 max_len의 길이를 가지도록
- input_ids는 각 토큰을 정수 index로 변환한 값
- LongTensor를 통해 64비트 정수(int64)로 변환

### collate_fn (과제)
```
labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))
# input_ids[-1] : special token ([SEP])
# input_ids[-2] : 마침표
# input_ids[-3] : 마지막 단어
```
texts = 마지막 단어를 제외한 문장<br/>
labels = 마지막 단어
```
texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
```
batch_first=True : (batch_size, seq_len) 형태로 변환 (기본값은 (seq_len, batch_size))<br/>
tokenizer.pad_token_id : 패딩 토큰의 ID

## Self-attention

이번에는 self-attention을 구현해보겠습니다.
Self-attention은 shape이 (B, S, D)인 embedding이 들어왔을 때 attention을 적용하여 새로운 representation을 만들어내는 module입니다.
여기서 B는 batch size, S는 sequence length, D는 embedding 차원입니다.
구현은 다음과 같습니다.

In [3]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim    # 입력의 크기 = (B, S, input_dim)
    self.d_model = d_model

    # 각 토큰의 embedding 벡터를 새로운 벡터로 변환
    self.wq = nn.Linear(input_dim, d_model)   # 현재 토큰이 다른 토큰과 얼마나 관련 있는지를 찾는 데 사용
    self.wk = nn.Linear(input_dim, d_model)   # 다른 토큰이 현재 토큰과 얼마나 관련 있는지를 결정하는 키
    self.wv = nn.Linear(input_dim, d_model)   # 최종적으로 업데이트할 정보
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)  # (B, S, D)

    score = torch.matmul(q, k.transpose(-1, -2))  # (B, S, D) * (B, D, S) = (B, S, S)
                                                  # score가 클수록 높은 유사도(attention)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)               # 패딩 토큰에 대한 가중치를 제거하여 의미 없는 값이 영향을 주지 않도록

    score = self.softmax(score)                   # score를 확률 값(0~1)으로 변경
    result = torch.matmul(score, v)               # (B, S, S) * (B, S, D) = (B, S, D)
    result = self.dense(result)                   # attention값을 한 번 더 가공

    return result

대부분은 Transformer 챕터에서 배운 수식들을 그대로 구현한 것에 불과합니다.
차이점은 `mask`의 존재여부입니다.
이전 챕터에서 우리는 가변적인 text data들에 padding token을 붙여 하나의 matrix로 만든 방법을 배웠습니다.
**실제 attention 계산에서는 이를 무시**해주기 위해 mask를 만들어 제공해주게 됩니다.
여기서 mask의 shape은 (B, S, 1)로, 만약 `mask[i, j] = True`이면 그 변수는 padding token에 해당한다는 뜻입니다.
이러한 값들을 무시해주는 방법은 shape이 (B, S, S)인 `score`가 있을 때(수업에서 배운 $A$와 동일) `score[i, j]`에 아주 작은 값을 더해주면 됩니다. 아주 작은 값은 예를 들어 `-1000..00 = -1e9` 같은 것이 있습니다.
이렇게 작은 값을 더해주고 나면 softmax를 거쳤을 때 0에 가까워지기 때문에 weighted sum 과정에서 padding token에 해당하는 `v` 값들을 무시할 수 있게 됩니다.

다음은 self-attention과 feed-forward layer를 구현한 모습입니다.

In [4]:
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.dff = dff

    self.sa = SelfAttention(input_dim, d_model)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),  # 입력 벡터 크기를 d_model → dff로 확장 (차원 확장)
      nn.ReLU(),                # 비선형성을 추가해서 더 복잡한 패턴을 학습
      nn.Linear(dff, d_model)   # 다시 원래 크기인 dff → d_model로 축소 (차원 복원)
    )

  def forward(self, x, mask):
    x = self.sa(x, mask)    # x = (B, S, d_model)
    x = self.ffn(x)         # x = (B, S, d_model)
                            # SelfAttention 이후 더 복잡한 변환을 수행하기 위해 d_model보다 더 큰 차원을 사용했다가 다시 줄이는 구조

    return x

보시다시피 self-attention의 구현이 어렵지, Transformer layer 하나 구현하는 것은 수업 때 다룬 그림과 크게 구분되지 않는다는 점을 알 수 있습니다.

## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다: <br/>

```
np.arange(4)[:, None]
# array([[0],   # 단어 1번 위치
#        [1],   # 단어 2번 위치
#        [2],   # 단어 3번 위치
#        [3]])  # 단어 4번 위치
shape = (4, 1)

np.arange(4)[None, :]
# array([[0, 1, 2, 3]])
shape = (1, 4)

```

In [5]:
import numpy as np

# 위치 별 각도 계산
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

    # pos = (position, 1)
    # i = (1, d_model)
    # return = (position, d_model)

def positional_encoding(position, d_model): # position = 최대 문장 길이 (seq_len)
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])   # 짝수 차원에는 sin 적용
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])   # 홀수 차원에는 cos 적용
    pos_encoding = angle_rads[None, ...]    # (1, position, d_model)

    return torch.FloatTensor(pos_encoding)

max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


Positional encoding은 `angle_rads`를 구현하는 과정에서 모두 구현이 되었습니다. 여기서 `angle_rads`의 shape은 (S, D)입니다.
우리는 일반적으로 batch로 주어지는 shape이 (B, S, D)인 tensor를 다루기 때문에 마지막에 None을 활용하여 shape을 (1, S, D)로 바꿔주게됩니다.

위에서 구현한 `TransformerLayer`와 positional encoding을 모두 합친 모습은 다음과 같습니다:

In [6]:
len(tokenizer)

30522

In [6]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size  # 단어 사전의 총 단어 개수 (len(tokenizer)=30522)
    self.d_model = d_model        # 단어를 표현할 벡터 차원 (32)
    self.n_layers = n_layers      # encoder layer 개수 (2)
    self.dff = dff                # hidden layer size (32)

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)])

    # self.classification = nn.Linear(d_model, 1)
    self.classification = nn.Linear(d_model, len(tokenizer))

  def forward(self, x):

    # x.shape = (B, S)

    mask = (x == tokenizer.pad_token_id)  # tokenizer.pad_token_id = 0
                                          # padding token이면 True
                                          # mask.shape = (B, S) (boolean 행렬)
    mask = mask[:, None, :]               # mask.shape = (B, 1, S) 로 차원 확장
    seq_len = x.shape[1]        # S = 400 (배치마다 달라질 수도 있음)

    x = self.embedding(x)       # x.shape = (B, S, d_model)
                                # 각 토큰의 정수 index를 벡터로 변환하여 의미를 가질 수 있도록
    x = x * sqrt(self.d_model)  # embedding한 벡터 값이 너무 작으면 self-attention 연산 시 기울기 소실 문제 발생할 수 있으므로 scale 조정
                                # 즉, 벡터 크기를 조정하여 안정적인 학습이 가능하도록
    x = x + self.pos_encoding[:, :seq_len]  # (B, S, d_model) + (1, S, d_model)

    for layer in self.layers:   # 2개의 TransformerLayer 통과
      x = layer(x, mask)        # 각 레이어마다 Self-Attention & FFN 연산

    x = x[:, 0]                 # x.shape = (B, d_model)
                                # 첫 번째 토큰만 선택
    x = self.classification(x)  # x.shape = (B, 1) - 1(긍정)/0(부정) 예측

    return x


model = TextClassifier(len(tokenizer), 32, 2, 32)

기존과 다른 점들은 다음과 같습니다:
1. `nn.ModuleList`를 사용하여 여러 layer의 구현을 쉽게 하였습니다.
2. Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 `x[:, 0]`입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

## 학습

학습하는 코드는 기존 실습들과 동일하기 때문에 마지막 결과만 살펴보도록 하겠습니다.

In [22]:
from torch.optim import Adam

lr = 5e-4
model = model.to('cuda')
# loss_fn = nn.BCEWithLogitsLoss()
loss_fn = nn.CrossEntropyLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [23]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    preds = torch.argmax(preds, dim=-1)
    # preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [24]:
n_epochs = 20

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data

    # inputs = (64, max_len) : 토큰화된 문장
    # labels = (64,) : 마지막 단어의 정수 index

    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    # preds = model(inputs)[..., 0] # 마지막 차원의 첫 번째 원소, 즉 (64,)
    preds = model(inputs)

    # preds = (64, len(tokenizer))

    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    # train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 2600.5678119659424
Epoch   1 | Train Loss: 2572.3334884643555
Epoch   2 | Train Loss: 2555.952506542206
Epoch   3 | Train Loss: 2540.2408170700073
Epoch   4 | Train Loss: 2524.5821285247803
Epoch   5 | Train Loss: 2507.238224029541
Epoch   6 | Train Loss: 2486.990801334381
Epoch   7 | Train Loss: 2465.5504879951477
Epoch   8 | Train Loss: 2441.1080026626587
Epoch   9 | Train Loss: 2417.1552863121033
Epoch  10 | Train Loss: 2394.829668045044
Epoch  11 | Train Loss: 2370.080690383911
Epoch  12 | Train Loss: 2346.5314631462097
Epoch  13 | Train Loss: 2321.385977268219
Epoch  14 | Train Loss: 2297.3982257843018
Epoch  15 | Train Loss: 2270.5549545288086
Epoch  16 | Train Loss: 2247.055193901062
Epoch  17 | Train Loss: 2222.3680510520935
Epoch  18 | Train Loss: 2195.43754863739
Epoch  19 | Train Loss: 2171.6512360572815


학습이 안정적으로 진행되며 RNN보다 빨리 수렴하는 것을 확인할 수 있습니다.
하지만 test 정확도가 RNN보다 낮은 것을 보았을 때, overfitting에 취약하다는 것을 알 수 있습니다.