<a href="https://colab.research.google.com/github/jo1jun/Transformer/blob/main/Transformer_module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import

In [742]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

# 기존과 달리 device, dtype 전부 떼어내야 작동. 추측컨데, 내부적으로 구현이 되어있는 듯 하다.
# document 에 parameter 에 device 를 받는다고 나와있는데 바뀐듯 하다. 학습속도가 기존보다 느린데 알아보자.
# TODO : device 관련 내부 구현 여부 확인

# Token & Positional Embedding

In [743]:
class TokPosEmbedding(nn.Module):
  def __init__(self, input_dim, d_model, dropout_ratio):
    super().__init__()
    self.tokEmbedding = nn.Embedding(input_dim, d_model)
    self.posEmbedding = nn.Embedding(input_dim, d_model) # 위치 임베딩을 고정 함수가 아니라 학습하는 형태로 구현.
    self.d_model = d_model
    self.dropout = nn.Dropout(dropout_ratio)

  def forward(self, src):
    batch_size = src.shape[0]
    src_len = src.shape[1]

    pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1) # 0 ~ src_len 등차 수열값 을 bactch 한개와 동일한 shape으로 생성
    
    # pos: [batch_size, src_len]

    src = self.dropout((self.tokEmbedding(src) * np.sqrt(self.d_model)) + self.posEmbedding(pos))

    # src: [batch_size, src_len, d_model]

    return src

# Transformer


In [744]:
class Transformer(nn.Module):
    def __init__(self, input_dim, output_dim, d_model, n_layers, nhead, ff_dim, dropout_ratio):
        super().__init__()

        self.encEmbedding = TokPosEmbedding(input_dim, d_model, dropout_ratio)                # 외부에서 embedding
        self.encoderLayer = nn.TransformerEncoderLayer(d_model, nhead, ff_dim, dropout_ratio) # 구현되어있는 module 사용
        self.encoder = nn.TransformerEncoder(self.encoderLayer, n_layers)                     # 구현되어있는 module 사용
        self.decEmbedding = TokPosEmbedding(output_dim, d_model, dropout_ratio)               # 외부에서 embedding
        self.decoderLayer = nn.TransformerDecoderLayer(d_model, nhead, ff_dim, dropout_ratio) # 구현되어있는 module 사용
        self.decoder = nn.TransformerDecoder(self.decoderLayer, n_layers)                     # 구현되어있는 module 사용
        self.linear = nn.Linear(d_model, output_dim)                                          # 외부에서 마지막 fc-layer

    def make_pad_mask(self, src, pad):

        # src: [batch_size, src_len]

        # pad mask
        src_mask = (src.data.eq(pad))

        # src_mask: [batch_size, src_len]
        return src_mask

    def forward(self, src, tgt, pad):
        
        # src: [batch_size, src_len]
        # tgt: [batch_size, tgt_len]

        src_pad_mask = self.make_pad_mask(src, pad)
        tgt_pad_mask = self.make_pad_mask(tgt, pad)
        # chitting 방지 mask 의 경우, 아래와 같이 이미 구현되어있다. 기존과 mask 값이 다르므로 이미 구현된 것을 사용하면 된다.
        tgt_sub_mask = nn.Transformer.generate_square_subsequent_mask(self, sz=tgt.shape[1])

        # src_pad_mask: [batch_size, src_len]
        # tgt_pad_mask: [batch_size, tgt_len]
        # tgt_sub_mask: [tgt_len, tgt_len]

        src = self.encEmbedding(src)
        tgt = self.decEmbedding(tgt)

        # 기존과 달리 input, output 의 dim 0, 1 이 바뀌어 있다.
        enc_src = self.encoder(src.transpose(0,1), src_key_padding_mask=src_pad_mask) 

        # enc_src: [src_len, batch_size, d_model]

        output = self.decoder(tgt.transpose(0,1), enc_src, tgt_sub_mask, None, tgt_pad_mask, src_pad_mask)

        # output: [tgt_len, batch_size, d_model]

        output = self.linear(output.transpose(0, 1))

        # output: [batch_size, tgt_len, output_dim]

        return output

    def generate(self, src, start_id, sample_size, pad):

        batch_size = src.shape[0]

        src_pad_mask = self.make_pad_mask(src, pad)

        src = self.encEmbedding(src)

        enc_src = self.encoder(src.transpose(0,1), src_key_padding_mask=src_pad_mask)
      
        sampled_tensor = torch.LongTensor([start_id]).unsqueeze(0).repeat(batch_size, 1)
        for _ in range(sample_size):

          tgt_pad_mask = self.make_pad_mask(sampled_tensor, pad)
          tgt = self.decEmbedding(sampled_tensor)
          
          # generate 할 때는 하나씩 단어를 생성하므로 tgt_sub_mask 의미 x
          output = self.decoder(tgt.transpose(0,1), enc_src, None, None, tgt_pad_mask, src_pad_mask)
          output = self.linear(output.transpose(0,1))

          pred_token = output.argmax(2)[:,-1].unsqueeze(1)

          sampled_tensor = torch.cat((sampled_tensor, pred_token), 1) # pred sentence 에 concat

        return sampled_tensor

# Date format Dataset

In [745]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [746]:
import os
os.chdir('/content/drive/MyDrive/dataset')
import sequence

# google mount 한 뒤 '/content/drive/MyDrive/' 에 dataset 저장 후 실행.

(x_train, t_train), (x_test, t_test) = sequence.load_data('date.txt')
# char -> id & id -> char dictionary
char_to_id, id_to_char = sequence.get_vocab()

print(x_train.shape)
print(t_train.shape)
print(x_test.shape)
print(t_test.shape)
print()

# 이미 id 화 되어있다.
print('question(id) : ', x_train[0])
print('correct(id)  : ',t_train[0])
print()

# sequence 확인 # ' ' : pad, '_' : start_char
print('question(char) : ', ' '.join([id_to_char[int(c)] for c in x_train[0]]))
print('correct(char)  : ', ' '.join([id_to_char[int(c)] for c in t_train[0]]))

vocab_size = len(char_to_id)
x_train = torch.LongTensor(x_train)
t_train = torch.LongTensor(t_train)
x_test = torch.LongTensor(x_test)
t_test = torch.LongTensor(t_test)
pad = 7 # pad token

(45000, 29)
(45000, 11)
(5000, 29)
(5000, 11)

question(id) :  [ 8 22  9 22  9  8  7  7  7  7  7  7  7  7  7  7  7  7  7  7  7  7  7  7
  7  7  7  7  7]
correct(id)  :  [14 11 12  9  8 15 16  8 15 16  9]

question(char) :  2 / 7 / 7 2                                              
correct(char)  :  _ 1 9 7 2 - 0 2 - 0 7


# hyperparameter

In [747]:
batch_size = 128
epoch = 10
input_dim = output_dim = vocab_size
d_model = 32
n_layers = 1
nhead = 2
ff_dim = 1024
dropout_ratio = 0.1
learning_rate = 0.0025

In [748]:
model = Transformer(input_dim, output_dim, d_model, n_layers, nhead, ff_dim, dropout_ratio)

In [749]:
import torch.optim as optim

optimizer = torch.optim.Adam(model.parameters(), learning_rate)

criterion = nn.CrossEntropyLoss()

# Trainer

In [750]:
def trainer(x, t, max_epoch, batch_size, model, optimizer, pad):

    data_size = len(x)
    max_iters = data_size // batch_size
    pad = torch.LongTensor([pad])

    model.train()
    for e in range(max_epoch):
        for iters in range(max_iters):
            batch_x = x[iters*batch_size:(iters+1)*batch_size]
            batch_t = t[iters*batch_size:(iters+1)*batch_size]

            # correct 값의 마지막 원소 배제 (end token 없음. 마지막 원소 다음 token 학습할 필요 x)
            scores = model(batch_x, batch_t[:, :-1], pad)

            scores_dim = scores.shape[-1]

            scores = scores.reshape(-1, scores_dim)
            
            # correct 값의 첫 원소 배제
            batch_t = batch_t[:, 1:].reshape(-1)

            # scores  : [batch_size * tgt_len - 1, output_dim]
            # batch_t : [batch_size * tgt_len - 1]

            loss = criterion(scores, batch_t)

            optimizer.zero_grad()

            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0) # gradient clippling

            optimizer.step()

            if iters % 100 == 0:
                print('epoch[%d/%d] Iteration %d/%d, loss = %.4f' % (e+1, max_epoch, iters, max_iters, loss.item()))

# Train

In [751]:
trainer(x_train, t_train, epoch, batch_size, model, optimizer, pad)

epoch[1/1] Iteration 0/351, loss = 4.1073
epoch[1/1] Iteration 100/351, loss = 0.9447
epoch[1/1] Iteration 200/351, loss = 0.6229
epoch[1/1] Iteration 300/351, loss = 0.4225


# Checker

In [752]:
def checker(x, t, batch_size, model, pad):

    data_size = len(x)
    max_iters = data_size // batch_size
    pad = torch.LongTensor([pad])

    model.eval()
    with torch.no_grad():
      correct_num = 0
      for iters in range(max_iters):
        batch_x = x[iters*batch_size:(iters+1)*batch_size]
        batch_t = t[iters*batch_size:(iters+1)*batch_size]

        start_id = batch_t[0,0]
        correct = batch_t[:,1:]

        predict = model.generate(batch_x, start_id, correct.shape[1], pad)
        predict = predict[:,1:]

        correct_num += (predict == correct).sum()
        
    return correct_num

# Accuracy

In [753]:
correct_num = checker(x_train, t_train, batch_size, model, pad)
acc = float(correct_num) / (t_train.shape[0] * (t_train.shape[1] - 1))
print('train accuracy %.3f%%' % (acc * 100))

correct_num = checker(x_test, t_test, batch_size, model, pad)
acc = float(correct_num) / (t_test.shape[0] * (t_test.shape[1] - 1))
print('test accuracy %.3f%%' % (acc * 100))

UnboundLocalError: ignored

# Sampling

In [None]:
for i in range(10):
  idx = [np.random.randint(0, len(x_test))]

  question = x_test[idx]
  correct = t_test[idx]
  pad = torch.LongTensor([pad])

  correct = correct.flatten()
  # 머릿글자
  start_id = correct[0]

  correct = correct[1:]
  with torch.no_grad():
    predict  = model.generate(question, start_id, len(correct), pad)
  predict = predict[:,1:]

  # 문자열로 변환
  question = [id_to_char[int(c)] for c in question.flatten()]
  correct = [id_to_char[int(c)] for c in correct.flatten()]
  predict = [id_to_char[int(c)] for c in predict.flatten()]

  print(f'question {i+1} : ', ' '.join(question))
  print(f'correct {i+1}  : ', ' '.join(correct))
  print(f'predict {i+1}  : ', ' '.join(predict))
  print()