# GPT로 영화 리뷰 생성 모델 학습하기

이번 실습에서는 GPT를 imdb에서 제공하는 영화리뷰들로 학습하여 영화 리뷰를 만들어낼 수 있는 모델을 구현합니다.
먼저 필요한 library들을 import합니다.

In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-2.21.0-py3-none-any.whl.metadata (21 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-2.21.0-py3-none-any.whl (527 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m527.3/527.3 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl (39.9 MB)
[2

이후 저번 주차와 같은 코드로 tokenizer를 준비합니다.

In [None]:
import torch
import numpy as np

from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

# Hugging Face datasets 라이브러리에서 IMDB 데이터셋을 로드
ds = load_dataset("stanfordnlp/imdb")

# WordPiece: BERT에서 사용하는 토크나이징 방식
# [UNK]: 알 수 없는 토큰을 표시하는 특수 토큰
# 새로운 WordPiece 토크나이저 초기화
tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))

# BERT와 유사한 정규화 설정 (소문자 변환 및 기타 텍스트 조정)
tokenizer.normalizer = normalizers.BertNormalizer(lowercase=True)

# BERT와 유사한 사전 토크나이징 설정 (텍스트를 단어로 분리)
tokenizer.pre_tokenizer = pre_tokenizers.BertPreTokenizer()

def get_training_corpus():
    # 데이터셋에서 텍스트 데이터의 청크를 생성하는 제너레이터 함수
    for i in range(0, len(ds['train']), 1000):
        yield ds['train'][i: i + 1000]['text']

# 토크나이저를 위한 특수 토큰 정의
special_tokens = ["[UNK]", "[PAD]", "[CLS]", "[SEP]"]

# 지정된 어휘 크기와 특수 토큰으로 토크나이저를 훈련시키기 위한 WordPieceTrainer 초기화
trainer = trainers.WordPieceTrainer(vocab_size=10000, special_tokens=special_tokens)

# 훈련된 토크나이저를 사용하여 텍스트 데이터로 훈련
tokenizer.train_from_iterator(get_training_corpus(), trainer=trainer)

# Hugging Face Transformers 라이브러리와 호환되는 형식으로 훈련된 토크나이저로 변환
tokenizer = BertTokenizerFast(tokenizer_object=tokenizer)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

이번에는 next word prediction을 위한 data를 준비합니다. 다음과 같이 `collate_fn`을 수정합니다.

In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # 최대 길이 설정
    max_len = 400
    texts, labels = [], []

    for row in batch:
        # 텍스트를 토크나이징하고, input_ids를 얻어와서 라벨로 사용
        # [1:]은 [CLS] 토큰을 제거하기 위함
        labels.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[1:]))

        # [:-1]은 [SEP] 토큰을 제거하기 위함
        texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-1]))

    # 배치 내 모든 텍스트 시퀀스를 패딩하여 같은 길이로 맞춤
    # padding_value는 [PAD] 토큰의 ID
    texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)

    # 배치 내 모든 라벨 시퀀스를 패딩하여 같은 길이로 맞춤
    # padding_value는 [PAD] 토큰의 ID
    labels = pad_sequence(labels, batch_first=True, padding_value=tokenizer.pad_token_id)

    return texts, labels


이전 과제인 last word prediction과 달라진 점은 label은 `input_ids[1:]`를, text는 `input_ids[:-1]`를 사용한다는 것입니다.
이렇게 바꾸어 모든 가능한 input text의 segment에 대해 다음 단어를 예측할 수 있도록 합니다.

다음은 data loader와 모델을 구현합니다.

In [None]:
train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

In [None]:
from torch import nn
from math import sqrt


class MultiHeadAttention(nn.Module):
  def __init__(self, input_dim, d_model, n_heads):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.n_heads = n_heads

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    B, S, D = q.shape[0], q.shape[1], self.d_model // self.n_heads

    q = q.reshape((B, S, self.n_heads, D)).transpose(1, 2)
    k = k.reshape((B, S, self.n_heads, D)).transpose(1, 2)
    v = v.reshape((B, S, self.n_heads, D)).transpose(1, 2)

    score = torch.matmul(q, k.transpose(-1, -2)) # (B, H, S, D) * (B, H, D, S) = (B, H, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask[:, None] * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)  # (B, H, S, D)

    result = result.transpose(1, 2).reshape((B, S, -1))
    result = self.dense(result)

    return result


class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, n_heads, dff):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model
    self.n_heads = n_heads
    self.dff = dff

    self.sa = MultiHeadAttention(input_dim, d_model, n_heads)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff),
      nn.ReLU(),
      nn.Linear(dff, d_model)
    )

    self.norm1 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(0.1)

    self.norm2 = nn.LayerNorm(d_model)
    self.dropout2 = nn.Dropout(0.1)

  def forward(self, x, mask):
    x1 = self.sa(x, mask)
    x1 = self.dropout1(x1)
    x1 = self.norm1(x + x1)

    x2 = self.ffn(x1)
    x2 = self.dropout2(x2)
    x2 = self.norm2(x1 + x2)

    return x


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)

이번에는 Transformer를 가지고 GPT를 구현합니다.

In [None]:
max_len = 400


class GPT(nn.Module):
  def __init__(self, vocab_size, d_model, n_heads, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size
    self.d_model = d_model
    self.n_heads = n_heads
    self.n_layers = n_layers
    self.dff = dff

    self.embedding = nn.Embedding(vocab_size, d_model)
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, n_heads, dff) for _ in range(n_layers)])
    self.classification = nn.Linear(d_model, len(tokenizer))

  def forward(self, x):
    seq_len = x.shape[1]

    # 패딩 마스크: 패딩된 부분을 attention 계산에서 무시하기 위함
    mask1 = (x == tokenizer.pad_token_id)[..., None] # (B, S, 1)
    # 인과적 마스크: 각 위치에서 이전 위치의 토큰들만 참조할 수 있도록 자기회귀적 속성을 구현한다.
    # GPT와 같은 언어 모델에서 미래의 토큰을 보지 못하게 하는 역할을 한다.
    mask2 = torch.tril(torch.ones(seq_len, seq_len)).type(torch.ByteTensor).to(x.device)[None]  # (B, S, S)
    mask = mask1 & mask2

    x = self.embedding(x)
    x = x * sqrt(self.d_model)
    x = x + self.pos_encoding[:, :seq_len]

    for layer in self.layers:
      x = layer(x, mask)

    x = self.classification(x)
    return x


model = GPT(len(tokenizer), 32, 4, 5, 32)

이전에 구현한 `TextClassifier`와 유사하지만 다음 두 가지 차이점이 있습니다.

1. `mask`가 다르게 설정됩니다. 기존의 `mask`는 padding token만을 걸러냈다면, 이번에는 생성모델의 취지에 맞춰 예측하고자 하는 token의 representation을 결정할 때, 미래의 token들에는 영향을 받지 않도록 mask를 계산합니다. 이는 `mask2`에 해당합니다.
2. Classifier를 첫 번째 token 가지고 하는 것이 아닌 모든 token에 대해서 진행합니다.

이제 GPT를 학습해보고, 예측 결과를 이전과 같은 코드를 사용하여 확인해봅시다.

In [None]:
import numpy as np
import matplotlib.pyplot as plt

from torch.optim import Adam


lr = 0.001
model = model.to('cuda')
# 배치 내 각 샘플의 손실값을 그대로 유지하기 위해 reduction='none'
loss_fn = nn.CrossEntropyLoss(reduction='none')

optimizer = Adam(model.parameters(), lr=lr)
n_epochs = 10

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)

    # reshape에 -1을 사용하면 해당 차원이 자동으로 계산된다.
    # CrossEntropyLoss 함수의 입력 형식에 맞추기 위함이다.
    preds = preds.reshape((-1, len(tokenizer)))
    labels = labels.reshape(-1)
    mask = (inputs == tokenizer.pad_token_id)
    mask = mask.reshape(-1)

    loss = loss_fn(preds, labels)
    # 패딩된 위치의 손실은 무시하고 실제 토큰 위치의 손실만 계산하기 불리언 값을 반전시킨다.
    loss = (loss * ~mask).sum() / (~mask).sum()

    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

Epoch   0 | Train Loss: 3159.6602907180786
Epoch   1 | Train Loss: 2308.5477228164673
Epoch   2 | Train Loss: 2184.219573497772
Epoch   3 | Train Loss: 2128.4375467300415
Epoch   4 | Train Loss: 2092.940026283264
Epoch   5 | Train Loss: 2068.1226391792297
Epoch   6 | Train Loss: 2049.7196555137634
Epoch   7 | Train Loss: 2035.257968902588
Epoch   8 | Train Loss: 2023.613287448883
Epoch   9 | Train Loss: 2013.75026512146


In [None]:
input_text = "I am "
tokens_org = tokenizer(input_text).input_ids
tokens = torch.LongTensor(tokens_org)[None].to('cuda')

last_token_pred = model(tokens)[0, -1].argmax()
tokenizer.decode(tokens_org + [last_token_pred.item()])

'i am not'

보시다시피 loss도 잘 떨어지고 있고, "i am "이라는 문장을 넣었을 때 "not"이라는 token을 예측하여 말이 되는 생성 결과를 내놓는 것을 알 수 있습니다.