# Transformer 실습 + MHA


In [68]:
torch.cuda.empty_cache()

## 1. 라이브러리 준비

In [69]:
!pip install datasets sacremoses



## 2. 데이터셋 준비

In [87]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


# ds = load_dataset("stanfordnlp/imdb")
train_ds = load_dataset("stanfordnlp/imdb", split="train[:5%]")
test_ds = load_dataset("stanfordnlp/imdb", split="test[:5%]")

tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
  max_len = 400
  texts, labels = [], []

  # 배치 데이터의 각 행마다 반복
  for row in batch:
    # truncation이 True -> max length가 넘어가면 자름. 끝에서 3번째 토큰을 label(정답)로 선택
    labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
    # 마지막 3개를 제외한 앞 부분의 토큰들을 텐서로 변환한 후 텍스트(입력)로 사용
    texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))

  # 패딩 토큰 추가 (아마 400자 기준이겠지?)
  texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
  # labels를 텐서로 변환
  labels = torch.LongTensor(labels)

  return texts, labels

# 각각 트레인 로더, 테스트 로더를 만들어준다. 배치사이즈는 64
train_loader = DataLoader(
    train_ds, batch_size=16, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=16, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


- `tokenizer` → BERT 모델(bert-base-uncased)의 토크나이저를 가져옴
    - bert : BERT 모델
    - base : 12층 레이어, 은닉층 사이즈 768
    - uncased  : 대소문자 구분 없이, 소문자로 바꿔서 처리함

- 데이터의 배치 사이즈 64개 → 64개씩 묶어서 진행
- label 값 = text 값(=리뷰 내용)의 토큰들 중 끝에서 3번째
    - 왜 뒤에서 3번째일까…? 문장부호에 해당하는 토큰이나 `[SEP]` 토큰을 거르기 위해서인 것 같다.
    - 아무래도 아래와 같이 토크나이즈 되는 경우가 많을 거라 생각된다.

    ```['this', 'film', 'does', "n't", 'have', 'much', 'of', 'a', 'plot', '.', '[SEP]']```
- 입력 값 = 처음부터 끝에서 3번째까지의 값

### input 차원 체크

In [88]:
text, label = next(iter(train_loader))
print(text.shape, label.shape)

torch.Size([16, 397]) torch.Size([16])


## 멀티 헤드 어텐션 구현

In [89]:
from torch import nn
from math import sqrt


class MultiHeadAttention(nn.Module):
  def __init__(self, input_dim, d_model, n_heads):
    super().__init__()

    # 입력 벡터 차원
    self.input_dim = input_dim

    # Q, K, V 및 최종 출력 차원
    self.d_model = d_model

    #n_heads 헤드의 개수?!
    self.n_heads = n_heads

    # Q, K, V에 곱할 가중치 행렬값
    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)

    # 은닉층 한 번 선형 결과를 만들어서 정확도를 더 높여준다
    self.dense = nn.Linear(d_model, d_model)

    # 어텐션 스코어 계산에 필요한 소프트맥스. 확률 분포로 변환시켜준다. 0~1사이의 값
    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    # Q, K, V 벡터 값 생성
    q, k, v = self.wq(x), self.wk(x), self.wv(x) # (B, S, D)

    # 배치 크기
    B = q.shape[0]
    # 시퀀스 길이
    S = q.shape[1]
    # 임베딩 벡터 차원 근데 얘는 D = H * D' | D' = D // 2 (정수로 나눠야함)
    D_HEAD = self.d_model // self.n_heads

    #Q, K, V를 (B, S, H, D')로 리쉐이프 + (B,H,S,D')로 순서 바꿔줘야함 이런
    q = q.reshape(B, S, self.n_heads, D_HEAD).transpose(1,2)
    k = k.reshape(B, S, self.n_heads, D_HEAD).transpose(1,2)
    v = v.reshape(B, S, self.n_heads, D_HEAD).transpose(1,2)


    # Q * K^T 쿼리와 키의 유사도를 구함
    score = torch.matmul(q, k.transpose(-1, -2)) # (B,H,S,D') * (B,H,D',S) = (B, H, S, S)
    # 스케일링 : 루트 d로 나누어서 크기 조정을 한다.
    score = score / sqrt(self.d_model)

    # 마스킹 - 패딩 토큰 무시. 0에 수렴하도록? 거의 안보이게 하기 위해 -1e9 더함
    # score의 (B, H, S, S) 쉐이프에 맞춰야함
    # 마스크 값의 shape는 (B, 1, S).... 행렬 덧셈 조건에 맞게 차원을 더 추가해서 (B, 1, X, S) 이런식으로 하면 될듯
    if mask is not None:
      mask = mask[:, None]
      score = score + (mask * -1e9)

    # 스코어를 소프트 맥스로 변환
    score = self.softmax(score)
    # 어텐션 스코어를 value의 벡터에 곱해서 문맥을 반영한 최종 벡터 꺼냄
    result = torch.matmul(score, v) # -> (B, S, H, D')

    # (B, S, D)로 리쉐이프
    result = result.transpose(1,2) #(B, H, S, D')
    result = result.reshape((B, S, -1)) #(B,S,D)

    # 최종 출력값 변환
    result = self.dense(result) # -> (B, S, D)

    return result

**멀티-헤드-어텐션**
- 어텐션을 병렬로 여러개 둬서 처리하는 방식
- 헤드 별로 집중하는 관점이 다르다?
- 헤드의 개수를 정해줘야할 필요가 있는듯


## 4. 트랜스포머 레이어 정의

In [90]:
#트랜스포머
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, dff, n_heads):
    super().__init__()

    self.input_dim = input_dim # 입력 벡터 차원
    self.d_model = d_model # 모델 크기
    self.dff = dff #은닉층 크기

    self.mha = MultiHeadAttention(input_dim, d_model, n_heads) #멀티 헤드 어텐션 계산

    # 그냥 우리가 알고있는 MLP
    # 언어를 학습시키는 데 더 많은 가중치(weight)를 부여함
    # 비선형 처리 + 추가 가중치
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )

    # residual + 레이어 정규화
    self.norm1 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(0.1)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout2 = nn.Dropout(0.1)

  def forward(self, x, mask):
    x1 = self.mha(x, mask) #1. 멀티헤드 어텐션의 결과를 가져온다.
    x1 = self.dropout1(x1)
    x1 = self.norm1(x + x1)

    x2 = self.ffn(x1)
    x2 = self.dropout2(x2)
    x2 = self.norm2(x1 + x2)

    return x2

**residual 학습**
- 특정 레이어를 건너 뛰어서 복사된 값을 그대로 넣어줌
- mha -> add + norm -> ffn -> add + norm
- 첫번째 add + norm = 멀티헤드 레이어 적용 값(x1) + 적용안한 값 (x)
- 두번째 add + norm = 첫번째 add + norm 값(x1) + x1 값을 FFN에 적용 시킨 값

**Layer normalization**
- 출력값 안정화

## Positional encoding

이번에는 positional encoding을 구현합니다. Positional encoding의 식은 다음과 같습니다:
$$
\begin{align*} PE_{pos, 2i} &= \sin\left( \frac{pos}{10000^{2i/D}} \right), \\ PE_{pos, 2i+1} &= \cos\left( \frac{pos}{10000^{2i/D}} \right).\end{align*}
$$

이를 Numpy로 구현하여 PyTorch tensor로 변환한 모습은 다음과 같습니다:

In [91]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates
# 포지셔널 인코딩
# 트랜스포머를 사용하려면 순서 정보 벡터값을 따로 계산해서 입력해줘야한다.
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


Positional encoding은 `angle_rads`를 구현하는 과정에서 모두 구현이 되었습니다. 여기서 `angle_rads`의 shape은 (S, D)입니다.
우리는 일반적으로 batch로 주어지는 shape이 (B, S, D)인 tensor를 다루기 때문에 마지막에 None을 활용하여 shape을 (1, S, D)로 바꿔주게됩니다.

위에서 구현한 `TransformerLayer`와 positional encoding을 모두 합친 모습은 다음과 같습니다

.... 이 부분 어려워서 패스하겠습니다 😞

In [92]:
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_layers = n_layers
    self.dff = dff

    # 워드 임베딩
    self.embedding = nn.Embedding(vocab_size, d_model)
    # 포지셔널 인코딩
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    # rnn 대신 트랜스포머 레이어를 통과시킴.
    # 인코더, 디코더를 여러개 두고 쓰는 방식이라서 여러개를 만들어서 통과시키는듯
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)])
    # 마지막 토큰을 예측 -> 학습 데이터셋들의 모든 토큰들의 개수를 체크하는게 맞겠지 여기서 가장 유리한 것을 골라야하니까
    self.classification = nn.Linear(d_model, vocab_size)

  def forward(self, x):
    # x의 shape == (batch_size, seq_len, d_model)
    # 마스크 정의: 패딩 토큰이 있다면 true
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :] # -> (B,1,S)?

    # 시퀀스 데이터의 길이
    seq_len = x.shape[1]

    # 워드 임베딩
    x = self.embedding(x)
    # 노멀라이즈?
    x = x * sqrt(self.d_model)
    # 포지셔널 인코딩 - 위치 정보 벡터 추가
    x = x + self.pos_encoding[:, :seq_len]

    # 트랜스포머의 레이어 갯수만큼 돌려라...
    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0] # 첫번째 차원(배치)에서 모든 요소를 선택, 두번째 차원(시퀀스 데이터) 첫번째 요소만 선택해서 마지막 단어를 예측하게끔 하는듯
    x = self.classification(x)

    return x


model = TextClassifier(len(tokenizer), 32, 5, 32, 4)

기존과 다른 점들은 다음과 같습니다:
1. `nn.ModuleList`를 사용하여 여러 layer의 구현을 쉽게 하였습니다.
2. Embedding, positional encoding, transformer layer를 거치고 난 후 마지막 label을 예측하기 위해 사용한 값은 `x[:, 0]`입니다. 기존의 RNN에서는 padding token을 제외한 마지막 token에 해당하는 representation을 사용한 것과 다릅니다. 이렇게 사용할 수 있는 이유는 attention 과정을 보시면 첫 번째 token에 대한 representation은 이후의 모든 token의 영향을 받습니다. 즉, 첫 번째 token 또한 전체 문장을 대변하는 의미를 가지고 있다고 할 수 있습니다. 그래서 일반적으로 Transformer를 text 분류에 사용할 때는 이와 같은 방식으로 구현됩니다.

## 학습

In [93]:
from torch.optim import Adam

lr = 0.001
model = model.to('cuda')
# 교차 엔트로피로 수정
loss_fn = nn.CrossEntropyLoss()

optimizer = Adam(model.parameters(), lr=lr)

교차 엔트로피 손실 함수를 쓰는 이유?

- 마지막 단어 예측 → 데이터셋의 모든 단어들 중 하나일 가능성일 확률이 높음
- TextClassifier 클래스의 최종 출력 값의 shape vocab_size과 동일함
    - 즉, MNIST 분류 문제처럼 모든 토큰들 중 하나를 분류하는 것.
    - 교차 엔트로피는 분류 문제에서 예측값들의 정답 확률 분포를 알려줌
    - `argmax(logits, dim=-1)` 을 통해 가장 높은 확률을 선택

In [94]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # 가장 큰 값이 곧 가장 정답에 가까운 예측값
    preds = torch.argmax(preds, dim=-1)

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [78]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [95]:

n_epochs = 50

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    # 수많은 값을 예측해서 가장 큰 값을 고르는게 포인트
    preds = model(inputs)
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 761.4955129623413
Epoch   1 | Train Loss: 544.5182375907898
Epoch   2 | Train Loss: 480.92755603790283
Epoch   3 | Train Loss: 468.1751036643982
Epoch   4 | Train Loss: 458.19014072418213
Epoch   5 | Train Loss: 442.2921471595764
Epoch   6 | Train Loss: 427.9643712043762
Epoch   7 | Train Loss: 400.3751504421234
Epoch   8 | Train Loss: 379.7343783378601
Epoch   9 | Train Loss: 350.517271399498
Epoch  10 | Train Loss: 324.5260741710663
Epoch  11 | Train Loss: 297.46112036705017
Epoch  12 | Train Loss: 269.4104583263397
Epoch  13 | Train Loss: 241.00907158851624
Epoch  14 | Train Loss: 213.49428260326385
Epoch  15 | Train Loss: 185.5610523223877
Epoch  16 | Train Loss: 165.20812964439392
Epoch  17 | Train Loss: 138.28752529621124
Epoch  18 | Train Loss: 116.92633771896362
Epoch  19 | Train Loss: 101.74012446403503
Epoch  20 | Train Loss: 80.04417407512665
Epoch  21 | Train Loss: 64.60628643631935
Epoch  22 | Train Loss: 52.2269169986248
Epoch  23 | Train Loss: 40.

배치 사이즈랑 데이터셋의 양을 줄였더니 이런 일이...

In [99]:
from torch.nn.functional import softmax

# 1. 테스트 샘플 1개 가져오기
sample = test_ds[5]['text']
tokens = tokenizer(sample, truncation=True, max_length=400).input_ids

# 2. 정답은 뒤에서 -3 인덱스인 토큰, 입력은 그 앞부분
target_ids = tokens[-3]         # 정답
input_ids = tokens[:-3]          # 입력

# 3. 텐서로 변환해서 배치처럼 만들기
input_tensor = torch.LongTensor([input_ids]).to(device)  # shape: (1, L)

# 4. 모델에 넣기
model.eval()
with torch.no_grad():
    logits = model(input_tensor)
    pred_ids = logits.argmax(dim=-1)[0]

# 5. 예측 결과 디코딩
pred_tokens = tokenizer.decode(pred_ids.tolist()) #토큰 ID -> 문자
target_tokens = tokenizer.decode(target_ids)

print("🟦 전체 입력 문장:", sample)
print("🟦 입력 문장:", tokenizer.decode(input_ids))
print("🟩 정답 토큰:", target_tokens)
print("🟥 예측 토큰:", pred_tokens)


🟦 전체 입력 문장: I had high hopes for this one until they changed the name to 'The Shepherd : Border Patrol, the lamest movie name ever, what was wrong with just 'The Shepherd'. This is a by the numbers action flick that tips its hat at many classic Van Damme films. There is a nice bit of action in a bar which reminded me of hard target and universal soldier but directed with no intensity or flair which is a shame. There is one great line about 'being p*ss drunk and carrying a rabbit' and some OK action scenes let down by the cheapness of it all. A lot of the times the dialogue doesn't match the characters mouth and the stunt men fall down dead a split second before even being shot. The end fight is one of the better Van Damme fights except the Director tries to go a bit too John Woo and fails also introducing flashbacks which no one really cares about just gets in the way of the action which is the whole point of a van Damme film.<br /><br />Not good, not bad, just average generic action.
