### 모델 정의하기
* 언어 모델링 작업에서 nn.TransformerEncoder 모델을 학습
* 언어 모델링 작업의 경우 현재보다 미래에 위치해 있는 모든 토큰을 마스킹해야 함
* 출력 단어에 대한 확률 분포를 생성하기 위해 nn.TransformerEncoder 모델의 출력은 선형 레이어를 통과하여 정규화되지 않은 로짓(logits)을 출력
* 나중에 입력이 정규화되지 않은 로짓 값이어야 하는 CrossEntropyLoss를 사용

In [1]:
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

class TransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float = 0.5):
        """
        시퀀스 대 시퀀스 작업을 위한 트랜스포머 모델입니다.

        인자:
            ntoken(int): 토큰의 개수(어휘의 크기).
            d_model (int): 모델의 치수(임베딩 크기).
            nhead (int): 어텐션 헤드의 개수.
            d_hid (int): Feedforward 네트워크 모델의 차원.
            nlayers (int): 트랜스포머 인코더 레이어 수입니다.
            dropout(실수, 선택 사항): 드롭아웃 비율. 기본값은 0.5입니다.
        """
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)
        self.init_weights()

    def init_weights(self) -> None:
        """
        임베딩 및 선형 레이어의 가중치를 초기화합니다.
        """
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """
        인자:
            src: 텐서, 모양 ``[seq_len, batch_size]``
            src_mask: 텐서, 모양 ``[seq_len, seq_len]``

        출력:
            ``[seq_len, batch_size, ntoken]`` 형태의 Tensor를 반환합니다.
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.linear(output)
        return output

* PositionalEncoding 모듈은 시퀀스 내 토큰의 상대적 또는 절대적 위치에 대한 정보를 삽입
* 위치 인코딩은 임베딩과 동일한 차원을 가지므로 둘을 합산할 수 있음
* 주파수가 다른 sine과 cosine 함수를 사용

In [2]:
class PositionalEncoding(nn.Module):
    """
    트랜스포머 모델용 포지셔널 인코딩 클래스입니다.
    이 클래스는 시퀀스에서 각 토큰의 위치에 대한 정보를 추가하는 방법을 제공합니다.
    """

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        """
        포지셔널 인코딩 모듈을 초기화합니다.

        인자:
            d_model (int): 모델의 치수(임베딩 크기).
            dropout(float, 선택 사항): 드롭아웃 비율. 기본값은 0.1입니다.
            max_len (int, 선택 사항): 시퀀스의 최대 길이. 기본값은 5,000입니다.
        """
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 위치 [0, max_len]에 대한 텐서를 생성
        position = torch.arange(max_len).unsqueeze(1)
        
        # 사인(sine) 인코딩을 위한 div_term 계산
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        # 위치 인코딩 행렬을 0으로 초기화
        pe = torch.zeros(max_len, 1, d_model)
        
        # 텐서의 짝수 인덱스에 사인(sine) 인코딩 적용; 2i
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        
        # 텐서의 홀수 인덱스에 코사인 인코딩 적용; 2i+1
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        
        # 모델 파라미터로 간주해서는 안 되는 버퍼로 'pe'를 등록합니다.
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        포지셔널 인코딩 모듈의 forward 전달

        인자:
            x (Tensor): [seq_len, batch_size, embedding_dim] 구조의 입력 텐서

        리턴:
            텐서: 입력 텐서에 포지셔널 인코딩이 추가된 출력 텐서
        """
        # 입력 텐서에 포지셔널 인코딩 추가
        x = x + self.pe[:x.size(0)]
        
        # 드롭아웃 적용
        return self.dropout(x)

### 데이터 로드 및 배치

In [3]:
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# WikiText-2 학습 데이터셋 불러오기
train_iter = WikiText2(split='train')

# 텍스트를 토큰으로 변환하는 토크나이저 정의
tokenizer = get_tokenizer('basic_english')

# 학습데이터셋에서 어휘(vocab) 구축
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=['<unk>'])
vocab.set_default_index(vocab['<unk>'])

def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    """
    원시(raw) 텍스트를 플랫(flat) 텐서로 변환합니다.

    인자:
        raw_text_iter(dataset.IterableDataset): 원시 텍스트의 이터러블 데이터셋입니다.

    리턴:
        텐서: 토큰 인덱스의 플랫 텐서입니다.
    """
    # 원시 텍스트를 토큰화하고, 토큰을 인덱스로 변환하고, 각 줄에 대한 텐서를 생성합니다.
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    
    # 모든 텐서를 연결(Concatenate)하고 빈 텐서는 걸러냅니다.
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

# 어휘 구축 중에 학습 이터레이터가 소모되었으므로 다시 생성합니다.
train_iter, val_iter, test_iter = WikiText2()

# 원시 텍스트 데이터를 텐서로 처리
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)

# 사용할 디바이스 정의(사용 가능한 경우 GPU, 그렇지 않은 경우 CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def batchify(data: Tensor, bsz: int) -> Tensor:
    """
    데이터를 분리된 시퀀스로 나누고, 깔끔하게 맞지 않는 초과 요소를 제거합니다.

    인자:
        data(텐서): [N] 구조의 텐서입니다.
        bsz (int): 배치 크기.

    리턴:
        텐서: [N // bsz, bsz] 구조의 텐서.
    """
    # 형성할 수 있는 전체 시퀀스의 수를 계산합니다.
    seq_len = data.size(0) // bsz
    
    # 전체 시퀀스에 정확히 맞도록 데이터 다듬기
    data = data[:seq_len * bsz]
    
    # 데이터 구조를 변경하고 지정된 장치로 옮기기
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

# 학습 및 평가를 위한 배치 크기 정의
batch_size = 20
eval_batch_size = 10

# 데이터 batchify
train_data = batchify(train_data, batch_size)  # [seq_len, batch_size] 구조
val_data = batchify(val_data, eval_batch_size)
test_data = batchify(test_data, eval_batch_size)

#### 입력 및 타겟 시퀀스를 생성하는 함수
* get_batch()는 트랜스포머 모델에 대한 입력-타겟 시퀀스 쌍을 생성
* 이 함수는 소스 데이터를 bptt 길이의 말뭉치(chunk)로 세분화

In [4]:
bptt = 35
def get_batch(source: Tensor, i: int) -> Tuple[Tensor, Tensor]:
    """
    인자:
        source: Tensor, shape ``[full_seq_len, batch_size]``
        i: int

    리턴:
        tuple (data, target), where data has shape ``[seq_len, batch_size]`` and
        target has shape ``[seq_len * batch_size]``
    """
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].reshape(-1)
    return data, target

### 인스턴스 초기 설정하기

In [5]:
ntokens = len(vocab)  # 단어의 크기
emsize = 200  # 임베딩 차원
d_hid = 200  # 피드포워드 네트워크 모델의 차원을 "nn.TransformerEncoder"로 설정합니다.
nlayers = 2  # "nn.TransformerEncoder"의 "nn.TransformerEncoderLayer" 개수입니다.
nhead = 2  # "nn.MultiheadAttention"의 헤드 수
dropout = 0.2  # 드롭 아웃 확률
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

### 모델 실행
* SGD(확률적 경사 하강법) 옵티마이저와 함께 CrossEntropyLoss를 사용
* 학습율는 처음에 5.0으로 설정되며 StepLR 스케줄을 따름
* 학습 중에는 nn.utils.clip_grad_norm_을 사용하여 그래디언트가 폭발하는 것을 방지

In [6]:
import time

criterion = nn.CrossEntropyLoss()
lr = 5.0  # 학습률
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

def train(model: nn.Module) -> None:
    model.train()  # 학습모드 켜기
    total_loss = 0.
    log_interval = 200
    start_time = time.time()

    num_batches = len(train_data) // bptt
    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        data, targets = get_batch(train_data, i)
        output = model(data)
        output_flat = output.view(-1, ntokens)
        loss = criterion(output_flat, targets)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]
            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss)
            print(f'| epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | '
                  f'lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | '
                  f'loss {cur_loss:5.2f} | ppl {ppl:8.2f}')
            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data: Tensor) -> float:
    model.eval()  # 평가모드 켜기
    total_loss = 0.
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i)
            seq_len = data.size(0)
            output = model(data)
            output_flat = output.view(-1, ntokens)
            total_loss += seq_len * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) - 1)

* 주의! 아래코드를 실행하면 매우 많은 시간이 소모 됨

In [7]:
# best_val_loss = float('inf')
# epochs = 3
# 
# with TemporaryDirectory() as tempdir:
#     best_model_params_path = os.path.join(tempdir, "best_model_params.pt")
# 
#     for epoch in range(1, epochs + 1):
#         epoch_start_time = time.time()
#         train(model)
#         val_loss = evaluate(model, val_data)
#         val_ppl = math.exp(val_loss)
#         elapsed = time.time() - epoch_start_time
#         print('-' * 89)
#         print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
#             f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
#         print('-' * 89)
# 
#         if val_loss < best_val_loss:
#             best_val_loss = val_loss
#             torch.save(model.state_dict(), best_model_params_path)
# 
#         scheduler.step()
#     model.load_state_dict(torch.load(best_model_params_path)) # 가장 좋은 모델 state 불러오기

### 테스트셋 데이터로 가장 좋은 모델을 평가하기

In [8]:
test_loss = evaluate(model, test_data)
test_ppl = math.exp(test_loss)
print('=' * 89)
print(f'| End of training | test loss {test_loss:5.2f} | '
      f'test ppl {test_ppl:8.2f}')
print('=' * 89)

| End of training | test loss 10.67 | test ppl 42965.40
