In [1]:
import math
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer   # 트랜스포머에 특화된 모듈
import torchtext   # torchtext.datasets에서 사용 가능한 데이터셋 다운로드를 위한 모듈
from torchtext.data.utils import get_tokenizer

# 트랜스포머 모델 아키텍처

 <img src="https://wikidocs.net/images/page/159310/img_original_paper-726x1030.png" width="400" height="200" align="left"/>


**임베딩 계층**<br>
이 계층은 임베딩, 즉 시퀀스의 각 입력 단어를 숫자 벡터로 변환하는 전형적인 작업을 수행한다.<br>
- torch.nn.Embedding 모듈을 사용한다.

**위치 인코더(PosEnc)** <br>
트랜스포머 아키텍처에는 순환 계층이 없지만 시퀀스 작업에서 순환 네트워크보다 성능이 뛰어나다.<br>
어떻게 가능할까? <br>
- 위치 인코딩이라는 트릭으로 모델이 데이터의 순서에 대해 감을 잡을 수 있다.
1. 특정 순차 패턴을 따르는 벡터가 입력 단어 임베딩에 추가된다. <br>
2. 이러한 벡터는 모델에서 첫 번째 단어 뒤에 두 번째 단어가 따라 나오는 것을 이해할 수 있게 하는 방식으로 생성된다.<br> 
3. 벡터는 후속 단어 사이의 규칙적인 주기성과 거리를 나타내기 위해 각각 사인,코사인 곡선 함수를 사용해 생성된다.

**멀티-헤드 어텐션**
1. 시퀀스의 각 단어 임베딩은 셀프-어텐션 계층을 통과해 단어 임베딩과 똑같은 길이의 개별 출력을 만들어낸다.
2. scaled-dot attention 과정을 거쳐서 attention value 값을 구한다.
3. concat 한다.
- 이렇게 셀프-어텐션 헤드를 여러 개 두면 여러 개의 헤드가 시퀀스 단어의 다양한 관점에 집중하도록 도와준다.<br>
이는 합성곱 신경망에서 여러 개의 특징 맵이 다양한 패턴을 학습하는 방법과 유사하다.
<br><br>
- 디코더 유닛의 마스킹된 멀티-헤드 어텐션 계층이 추가 됐다는 점을 제외하면 이전과 동일한 방식으로 작동한다.<br>
디코더는 출력 시퀀스를 입력으로 한 번에 받기 때문에, 현재 시점의 단어를 예측하고자 할 때 입력 시퀀스 행렬로부터<br> 미래 시점의 단어까지도 참고할 수 있는 현상이 발생하여 이를 방지하기 위하여 **마스킹**을 사용한다

 <img src="https://production-media.paperswithcode.com/methods/multi-head-attention_l1A3G7a.png" width="300" height="300" align="left"/>

**Add&Norm 계층**<br>
각 인스턴스에서 입력단어 임베딩 벡터를 멀티-헤드 어텐션 계층의 출력 벡터에 바로 더함으로써 잔차 연결이 설정된다.<br>
이렇게 하면 네트워크 전체에서 경사를 전달하기 더 쉽고, 경사가 폭발하거나 소실하는 문제를 피할 수 있다.

 <img src="https://wikidocs.net/images/page/31379/transformer22.PNG" width="200" height="200" align="left"/>

**Feed Forward 계층**<br>
인코더와 디코더 유닛 모두에서 시퀀스의 모든 단어에 대해 정규화된 잔차 출력 벡터가 공통 feed forward 계층을 통해 전달된다.<br>
단어 전체에 공통 매개변수 세트가 있기 때문에 이 계층은 시퀀스 전체에서 더 광범위한 패턴을 학습하는 데 도움이 된다.

**선형 및 소프트맥스 계층**<br>
선형 계층은 벡터 시퀀스를 단어 사전의 길이와 똑같은 크기를 갖는 벡터로 변환한다.<br>
소프트맥스 계층은 이 출력을 확률 벡터로 변환한다. 
<br> >> 이 확률은 사전에서 각 단어가 시퀀스의 다음 단어로 등장할 확률을 의미한다. 

In [3]:
class PosEnc(nn.Module):
    def __init__(self, d_m, dropout=0.2, size_limit=5000):

        # d_m은 임베딩 차원과 동일
        super(PosEnc, self).__init__()
        self.dropout = nn.Dropout(dropout)
        p_enc = torch.zeros(size_limit, d_m)
        pos = torch.arange(0, size_limit, dtype=torch.float).unsqueeze(1)
        
        # divider는 라디안 리스트로 여기에 단어 위치 인덱스를 곱하여
        # sin,cos 함수에 제공한다
        divider = torch.exp(torch.arange(0, d_m, 2).float() * (-math.log(10000.0) / d_m))
        p_enc[:, 0::2] = torch.sin(pos * divider)
        p_enc[:, 1::2] = torch.cos(pos * divider)
        p_enc = p_enc.unsqueeze(0).transpose(0, 1)
        self.register_buffer('p_enc', p_enc)

    def forward(self, x):
        return self.dropout(x + self.p_enc[:x.size(0), :])

# 트랜스포머 모델 정의

In [2]:
class Transformer(nn.Module):
    def __init__(self, num_token, num_inputs, num_heads, num_hidden, num_layers, dropout=0.3):
        super(Transformer, self).__init__()
        self.model_name = 'transformer'
        self.mask_source = None
        self.position_enc = PosEnc(num_inputs, dropout) # 위치 인코딩
        layers_enc = TransformerEncoderLayer(num_inputs, num_heads, num_hidden, dropout) # TransformerEncoderLayer
        self.enc_transformer = TransformerEncoder(layers_enc, num_layers) # TransformerEncoder
        self.enc = nn.Embedding(num_token, num_inputs)
        self.num_inputs = num_inputs
        self.dec = nn.Linear(num_inputs, num_token)
        self.init_params()

    def _gen_sqr_nxt_mask(self, size):
        msk = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
        msk = msk.float().masked_fill(msk == 0, float('-inf'))
        msk = msk.masked_fill(msk == 1, float(0.0))
        return msk

    def init_params(self):
        initial_rng = 0.12
        self.enc.weight.data.uniform_(-initial_rng, initial_rng)
        self.dec.bias.data.zero_()
        self.dec.weight.data.uniform_(-initial_rng, initial_rng)
    
    # forward 메서드에서 입력은 위치적으로 인코딩된 다음 인코더를 통과한 후 디코더를 통과한다
    def forward(self, source):
        if self.mask_source is None or self.mask_source.size(0) != len(source):
            dvc = source.device
            msk = self._gen_sqr_nxt_mask(len(source)).to(dvc)
            self.mask_source = msk

        source = self.enc(source) * math.sqrt(self.num_inputs)
        source = self.position_enc(source)
        op = self.enc_transformer(source, self.mask_source)
        op = self.dec(op)
        return op

sin,cos 함수는 순차 패턴을 제공하기 위해 번갈아 사용된다.

# 데이터셋 로딩 및 처리
위키피디아 텍스트 사용

In [4]:
# 데이터셋을 다운로드하고, 사전을 토큰화하고 데이터셋을 훈련,검증,테스트셋으로 분할
TEXT = torchtext.data.Field(tokenize=get_tokenizer("basic_english"), lower=True, eos_token='<eos>', init_token='<sos>')
training_text, validation_text, testing_text = torchtext.datasets.WikiText2.splits(TEXT)
TEXT.build_vocab(training_text)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 훈련과 검증에 사용할 배치 크기를 정의하고 다음과 같이 배치 생성 함수를 선언
def gen_batches(text_dataset, batch_size):
    text_dataset = TEXT.numericalize([text_dataset.examples[0].text])
    # 텍스트 데이터셋을 batch_size와 동일한 크기의 부분으로 나눔
    num_batches = text_dataset.size(0) // batch_size
    
    # 배치 밖에 위치한 데이터 포인트(나머지에 해당하는 부분)를 제거
    text_dataset = text_dataset.narrow(0, 0, num_batches * batch_size)
    
    # 데이터셋을 배치에 균등하게 배포
    text_dataset = text_dataset.view(batch_size, -1).t().contiguous()
    
    return text_dataset.to(device)

training_batch_size = 32
evaluation_batch_size = 16

training_data = gen_batches(training_text, training_batch_size)
validation_data = gen_batches(validation_text, evaluation_batch_size)
testing_data = gen_batches(testing_text, evaluation_batch_size)

downloading wikitext-2-v1.zip


.data\wikitext-2\wikitext-2-v1.zip: 100%|█████████████████████████████████████████| 4.48M/4.48M [00:02<00:00, 1.99MB/s]


extracting


In [5]:
# 최대 시퀀스 길이를 정의 
max_seq_len = 64

# 그에 따라 입력 시퀀스와 각 배치에 대한 출력 타깃을 생성하는 함수를 생성
def return_batch(src, k):
    sequence_length = min(max_seq_len, len(src) - 1 - k)
    sequence_data = src[k:k+sequence_length]
    sequence_label = src[k+1:k+1+sequence_length].view(-1)
    return sequence_data, sequence_label

# 트랜스포머 모델 훈련
모델 매개변수 정의 및 인스턴스화

In [6]:
num_tokens = len(TEXT.vocab.stoi) # 사전 크기
embedding_size = 256 # 임베딩 계층의 차원
num_hidden_params = 256 # 트랜스포머 인코더의 은닉 계층 차원
num_layers = 2 # 트랜스포머 인코더 내부의 트랜스포머 인코더 계층 개수
num_heads = 2 # 어텐션 모델의 헤드 개수(멀티-헤드)
dropout = 0.25 
loss_func = nn.CrossEntropyLoss()
lrate = 4.0 # 학습률
transformer_model = Transformer(num_tokens, embedding_size, num_heads, num_hidden_params, num_layers, dropout).to(device)
optim_module = torch.optim.SGD(transformer_model.parameters(), lr=lrate)
sched_module = torch.optim.lr_scheduler.StepLR(optim_module, 1.0, gamma=0.88)

In [7]:
def train_model():
    transformer_model.train()
    loss_total = 0.
    time_start = time.time()
    num_tokens = len(TEXT.vocab.stoi)
    for b, i in enumerate(range(0, training_data.size(0) - 1, max_seq_len)):
        train_data_batch, train_label_batch = return_batch(training_data, i)
        optim_module.zero_grad()
        op = transformer_model(train_data_batch)
        loss_curr = loss_func(op.view(-1, num_tokens), train_label_batch)
        loss_curr.backward()
        torch.nn.utils.clip_grad_norm_(transformer_model.parameters(), 0.6)
        optim_module.step()

        loss_total += loss_curr.item()
        interval = 100
        if b % interval == 0 and b > 0:
            loss_interval = loss_total / interval
            time_delta = time.time() - time_start
            print(f"epoch {ep}, {b}/{len(training_data)//max_seq_len} batches, training loss {loss_interval:.2f},
                  training perplexity {math.exp(loss_interval):.2f}")
            loss_total = 0
            time_start = time.time()

def eval_model(eval_model_obj, eval_data_source):
    eval_model_obj.eval() 
    loss_total = 0.
    num_tokens = len(TEXT.vocab.stoi)
    with torch.no_grad():
        for j in range(0, eval_data_source.size(0) - 1, max_seq_len):
            eval_data, eval_label = return_batch(eval_data_source, j)
            op = eval_model_obj(eval_data)
            op_flat = op.view(-1, num_tokens)
            loss_total += len(eval_data) * loss_func(op_flat, eval_label).item()
    return loss_total / (len(eval_data_source) - 1)

In [8]:
min_validation_loss = float("inf")
eps = 5
best_model_so_far = None

for ep in range(1, eps + 1):
    ep_time_start = time.time()
    train_model()
    validation_loss = eval_model(transformer_model, validation_data)
    print()
    print(f"epoch {ep:}, validation loss {validation_loss:.2f}, validation perplexity {math.exp(validation_loss):.2f}")
    print()

    if validation_loss < min_validation_loss:
        min_validation_loss = validation_loss
        best_model_so_far = transformer_model

    sched_module.step()

epoch 1, 100/1018 batches, training loss 8.63, training perplexity 5623.58
epoch 1, 200/1018 batches, training loss 7.19, training perplexity 1324.49
epoch 1, 300/1018 batches, training loss 6.79, training perplexity 887.43
epoch 1, 400/1018 batches, training loss 6.56, training perplexity 703.80
epoch 1, 500/1018 batches, training loss 6.46, training perplexity 637.17
epoch 1, 600/1018 batches, training loss 6.32, training perplexity 556.63
epoch 1, 700/1018 batches, training loss 6.25, training perplexity 515.82
epoch 1, 800/1018 batches, training loss 6.13, training perplexity 458.09
epoch 1, 900/1018 batches, training loss 6.10, training perplexity 444.86
epoch 1, 1000/1018 batches, training loss 6.07, training perplexity 430.96

epoch 1, validation loss 5.87, validation perplexity 355.73

epoch 2, 100/1018 batches, training loss 5.98, training perplexity 396.37
epoch 2, 200/1018 batches, training loss 5.90, training perplexity 366.02
epoch 2, 300/1018 batches, training loss 5.82, 

**perplexity(혼란도)**는 자연어 처리에서 **확률분포(여기서는 언어 모델)**가 샘플에 얼마나 잘 맞는지를 나타내기 위해 사용되는 지표이다.<br>
-> 작을수록 좋음

모델 훈련이후 테스트셋에서 모델 성능평가

In [9]:
testing_loss = eval_model(best_model_so_far, testing_data)
print(f"testing loss {testing_loss:.2f}, testing perplexity {math.exp(testing_loss):.2f}")

testing loss 5.15, testing perplexity 172.64
