In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [3]:
from torch.utils.data import Dataset, DataLoader , Subset, random_split

In [4]:
import torchvision
import torchvision.transforms as transforms

In [5]:
import sys
import numpy as np
import random
import pandas as pd

In [6]:
import import_ipynb
import preprocessor

importing Jupyter notebook from preprocessor.ipynb


In [7]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [8]:
from nltk.tokenize import word_tokenize

from konlpy.tag import *

In [9]:
import unittest

import copy

## Data

In [10]:
mecab = Mecab()
ko_tokenize = mecab.morphs

In [11]:
dir_path = '../../Data/'
data = pd.read_excel(dir_path + '한국어_대화체_번역.xlsx' , engine='openpyxl')

In [12]:
en_data = data['번역문']
kor_data = data['원문']

In [13]:
en_vs = preprocessor.VocabSet(word_tokenize)
en_tokens = en_vs.tokens(en_data)
en_encoder = preprocessor.Encoder(en_data, word_tokenize, en_tokens)

In [14]:
kor_vs = preprocessor.VocabSet(mecab.morphs)
kor_tokens = kor_vs.tokens(kor_data)
kor_encoder = preprocessor.Encoder(kor_data, mecab.morphs, kor_tokens)

In [15]:
en_encoded = en_encoder.encode()
kor_encoded = kor_encoder.encode()

In [16]:
max_len = 30

In [37]:
en_array = pad_sequences(en_encoded, maxlen=max_len+1, padding='post')
kor_array = pad_sequences(kor_encoded, maxlen=max_len, padding='post')

### DataSet

In [38]:
class EncoderDecoderDataset(Dataset) :

    def __init__(self , en_encoded , de_encoded , val_ratio=0.9) :

        super(EncoderDecoderDataset , self).__init__()
        
        self.en_in = en_encoded
        self.de_in = de_encoded[:,:-1]
        self.de_out = de_encoded[:,1:]
        
        self.val_ratio = val_ratio

    def __len__(self) :

        return len(self.en_in)

    def __getitem__(self , idx) :

        en_in_idx = self.en_in[idx]
        de_in_idx = self.de_in[idx]
        de_out_idx = self.de_out[idx]
        
        return {'encoder_in' : en_in_idx, 'decoder_in' : de_in_idx, 'decoder_out' : de_out_idx}
    
    def split_dataset(self) :

        n_val = int(len(self) * self.val_ratio)
        n_train = len(self) - n_val
        train_set, val_set = random_split(self, [n_train, n_val])
        
        return train_set, val_set

In [39]:
dataset = EncoderDecoderDataset(kor_array, en_array)

In [40]:
data_loader = DataLoader(dataset ,
                         batch_size = 256,
                         num_workers = 4,
                         shuffle = True,
                         drop_last = True)

In [41]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu") 

## Model

In [42]:
class PaddingMask(nn.Module) :

    def __init__(self, sen_size) :
        super(PaddingMask , self).__init__() 
        self.sen_size = sen_size
    
    def forward(self, in_tensor) :
        batch_size = in_tensor.shape[0]
        # mask tensor which element is 0.0
        flag_tensor = torch.where(in_tensor == 0.0 , 1.0 , 0.0)
        # shape : (batch_size, 1, 1, sen_size)
        flag_tensor = torch.reshape(flag_tensor , (batch_size, 1, 1, self.sen_size)) 
        
        return flag_tensor

In [43]:
class LookAheadMask(nn.Module) :

    def __init__(self, sen_size, cuda_flag) :
        super(LookAheadMask, self).__init__() 
        self.sen_size = sen_size
        self.mask_tensor = self.get_mask(sen_size).cuda() if cuda_flag else self.get_mask(sen_size)

    def get_mask(self, sen_size) :
        # masking tensor
        mask_array = 1 - np.tril(np.ones((sen_size,sen_size)) , 0)
        mask_tensor = torch.tensor(mask_array , dtype = torch.float32 , requires_grad=False)
        mask_tensor = mask_tensor.unsqueeze(0) # shape : (1, sen_size, sen_size)

        return mask_tensor
    
    def forward(self, in_tensor) :
        mask_tensor = torch.maximum(in_tensor, self.mask_tensor)

        return mask_tensor

In [44]:
class PositionalEncoding(nn.Module) :

    def __init__(self, pos_len, d_model, cuda_flag) :

        super(PositionalEncoding , self).__init__()
        self.pos_len = pos_len
        self.d_model = d_model

        # w : weight
        # pe : Encoding tensor
        if cuda_flag == True :
            self.w = torch.sqrt(torch.tensor(d_model, dtype=torch.float32, requires_grad=False)).cuda()
            self.pe = self.get_embedding(pos_len, d_model).cuda()

        else :
            self.w = torch.sqrt(torch.tensor(d_model, dtype=torch.float32, requires_grad=False))
            self.pe = self.get_embedding(pos_len, d_model)

    # Embedding tensor : (batch_size, sen_size, embedding_dimension)
    # Making Encoding tensor (1, sen_size, embedding_dimension)
    def get_embedding(self, pos_len, d_model) :
        pos_vec = torch.arange(pos_len).float()
        pos_vec = pos_vec.unsqueeze(1)

        i_vec = torch.arange(d_model).float() / 2
        i_vec = torch.floor(i_vec) * 2
        i_vec = i_vec.unsqueeze(0) / d_model
        i_vec = 1 / torch.pow(1e+4 , i_vec)

        em = torch.matmul(pos_vec, i_vec)
        pe = torch.zeros(pos_len, d_model, requires_grad=False)

        sin_em = torch.sin(em)
        cos_em = torch.cos(em)

        pe[:,::2] = sin_em[:,::2]
        pe[:,1::2] = cos_em[:,1::2]

        pe = pe.unsqueeze(0)

        return pe

    def forward(self, in_tensor) :
        en_tensor = in_tensor * self.w + self.pe
        
        return en_tensor

In [45]:
class MultiHeadAttention(nn.Module) :

    def __init__(self, sen_size,  d_model, num_heads) :

        super(MultiHeadAttention , self).__init__()
        self.sen_size = sen_size # sen_size
        self.d_model = d_model # embedidng_dim
        self.num_heads = num_heads # head_size
        self.depth = int(d_model / num_heads) # embedding_dim / num_heads

        self.q_layer = nn.Linear(d_model , d_model)
        self.k_layer = nn.Linear(d_model , d_model)
        self.v_layer = nn.Linear(d_model , d_model)
        self.o_layer = nn.Linear(d_model , d_model)

        self.scale = torch.sqrt(torch.tensor(self.depth , dtype=torch.float32 , requires_grad=False))
        
        self.init_param()

    def split(self, tensor) :
        tensor = torch.reshape(tensor , (-1 , self.sen_size , self.num_heads , self.depth)) # (batch_size, sen_size, num_heads, depth)
        tensor = torch.transpose(tensor , 2 , 1) # batch_size, num_heads, sen_size, depth)

        return tensor

    def merge(self, tensor) :
        tensor = torch.transpose(tensor , 2 , 1) # (batch_size, sen_size, num_heads, depth)
        tensor = torch.reshape(tensor , (-1 , self.sen_size , self.d_model)) # (batch_size , sen_size , embedding_dim)

        return tensor

    def scaled_dot_production(self, q_tensor, k_tensor, v_tensor, m_tensor) :
        q_tensor = self.split(q_tensor)
        k_tensor = self.split(k_tensor)
        v_tensor = self.split(v_tensor)
        
        k_tensor_T = torch.transpose(k_tensor , 3 , 2) # (batch_size, num_heads, depth, sen_size)

        qk_tensor = torch.matmul(q_tensor , k_tensor_T) # (batch_size, num_heads, sen_size, sen_size)
        qk_tensor /= self.scale

        # pad mask tensor shape : (batch_size, 1, 1, sen_size)
        # lookahead mask tensor shape : (batch_size, 1, sen_size, sen_size)
        if m_tensor != None :
            qk_tensor -= (m_tensor * 1e+6)

        qk_tensor = F.softmax(qk_tensor , dim = -1)
        att = torch.matmul(qk_tensor , v_tensor) # (batch_size, num_heads, sen_size, depth)

        return att

    # Xavier Initialization
    def init_param(self) :
        for m in self.modules() :
            if isinstance(m,nn.Linear) :
                nn.init.xavier_normal_(m.weight)
                nn.init.zeros_(m.bias)

    def forward(self, q_in, k_in, v_in, m_in) :
        q_tensor = self.q_layer(q_in)
        k_tensor = self.k_layer(k_in)
        v_tensor = self.v_layer(v_in)

        att_tensor = self.scaled_dot_production(q_tensor , k_tensor , v_tensor , m_in)
        att_tensor = self.merge(att_tensor)

        o_tensor = self.o_layer(att_tensor)

        return o_tensor



In [46]:
class FeedForward(nn.Module) :

    def __init__(self, hidden_size, d_model) :

        super(FeedForward , self).__init__()
        self.hidden_size = hidden_size
        self.d_model = d_model
        
        # relu activation and input, output dim are same
        self.ff = nn.Sequential(nn.Linear(d_model , hidden_size), 
                                nn.ReLU(),
                                nn.Linear(hidden_size , d_model))

        self.init_param()
                
    # He Initialization
    def init_param(self) :
        gain = 2 ** (1/2)
        
        for m in self.modules() :
            if isinstance(m , nn.Linear) :
                nn.init.kaiming_normal_(m.weight , gain)
                nn.init.zeros_(m.bias)

    def forward(self , in_tensor) :
        o_tensor = self.ff(in_tensor)

        return o_tensor



In [47]:
class Encoder(nn.Module) :

    def __init__(self, layer_size, sen_size, d_model, num_heads, hidden_size, drop_rate, norm_rate, cuda_flag) :

        super(Encoder , self).__init__()
        self.layer_size = layer_size
        self.sen_size = sen_size
        self.d_model = d_model
        self.num_heads = num_heads

        self.hidden_size = hidden_size

        self.mha_layer = nn.ModuleList()
        self.ff_layer = nn.ModuleList()
        self.drop_layer = nn.Dropout(drop_rate)
        self.norm_layer = nn.LayerNorm(d_model , eps=norm_rate)

        for i in range(layer_size) :
            # multihead attention layer
            mha_idx = MultiHeadAttention(sen_size , d_model , num_heads)
            self.mha_layer.append(mha_idx)
            
            # feedforward layer
            ff_idx = FeedForward(hidden_size , d_model)
            self.ff_layer.append(ff_idx)

    def forward_block(self, i, in_tensor, pad_tensor) :
        # query : encoder input
        # key : encoder input 
        # value : encoder input
        # mask ; pad_tensor of encoder input
        mha_tensor = self.mha_layer[i](in_tensor , in_tensor , in_tensor , pad_tensor)
        mha_tensor = self.drop_layer(mha_tensor)
        h_tensor = self.norm_layer(in_tensor + mha_tensor)

        ff_tensor = self.ff_layer[i](h_tensor)
        ff_tensor = self.drop_layer(ff_tensor)
        o_tensor = self.norm_layer(h_tensor + ff_tensor)

        return o_tensor

    def forward(self , in_tensor , pad_tensor) :
        tensor_ptr = in_tensor

        for i in range(self.layer_size) :
            tensor_ptr = self.forward_block(i , tensor_ptr , pad_tensor)
        
        return tensor_ptr



In [48]:
class Decoder(nn.Module) :

    def __init__(self, layer_size, sen_size, d_model, num_heads, hidden_size, drop_rate, norm_rate, cuda_flag) :

        super(Decoder , self).__init__()
        self.layer_size = layer_size
        self.sen_size = sen_size
        self.d_model = d_model
        self.num_heads = num_heads

        self.hidden_size = hidden_size

        self.m_mha_layer = nn.ModuleList()
        self.mha_layer = nn.ModuleList()
        self.ff_layer = nn.ModuleList()

        self.drop_layer = nn.Dropout(drop_rate)
        self.norm_layer = nn.LayerNorm(d_model , eps=norm_rate)

        self.la_mask = LookAheadMask(sen_size , cuda_flag)

        for i in range(layer_size) :
            m_mha_idx = MultiHeadAttention(sen_size , d_model , num_heads)
            # masked multihead attention layer
            self.m_mha_layer.append(m_mha_idx)

            mha_idx = MultiHeadAttention(sen_size , d_model , num_heads)
            # multihead attention layer
            self.mha_layer.append(mha_idx)

            ff_idx = FeedForward(hidden_size , d_model)
            # feedforward layer
            self.ff_layer.append(ff_idx)

    def forward_block(self, i, in_tensor, en_out_tensor, pad_tensor, mask_tensor) :
        # query : in_tensor
        # key : in_tensor 
        # value : in_tensor 
        # mask ; look ahead mask
        m_mha_tensor = self.m_mha_layer[i](in_tensor , in_tensor , in_tensor , mask_tensor)
        m_mha_tensor = self.drop_layer(m_mha_tensor)
        h_tensor = self.norm_layer(in_tensor + m_mha_tensor)

        # query : output of masked multihead attention
        # key : encoder output , 
        # value : encoder output , 
        # mask ; pad_tensor of decoder input
        mha_tensor = self.mha_layer[i](h_tensor , en_out_tensor , en_out_tensor , pad_tensor)
        mha_tensor = self.drop_layer(mha_tensor)
        a_tensor = self.norm_layer(mha_tensor + h_tensor)

        ff_tensor = self.ff_layer[i](a_tensor)
        ff_tensor = self.drop_layer(ff_tensor)
        o_tensor = self.norm_layer(a_tensor + ff_tensor)

        return o_tensor

    def forward(self , in_tensor , en_out_tensor , pad_tensor) :
        mask_tensor = self.la_mask(pad_tensor)

        tensor_ptr = in_tensor
        for i in range(self.layer_size) :
            tensor_ptr = self.forward_block(i , tensor_ptr , en_out_tensor , pad_tensor , mask_tensor)
        
        return tensor_ptr



In [49]:
class Transformer(nn.Module) :

    def __init__(self, layer_size, sen_size, en_vocabs, de_vocabs,
                 d_model , num_heads , hidden_size , drop_rate , norm_rate , cuda_flag) :

        super(Transformer , self).__init__()

        self.en_em = nn.Embedding(num_embeddings=en_vocabs, embedding_dim=d_model, padding_idx=0)
        self.de_em = nn.Embedding(num_embeddings=de_vocabs, embedding_dim=d_model, padding_idx=0)
        
        self.en_pos = PositionalEncoding(sen_size , d_model , cuda_flag)
        self.de_pos = PositionalEncoding(sen_size , d_model , cuda_flag)

        self.en_pad = PaddingMask(sen_size)
        self.de_pad = PaddingMask(sen_size)

        self.en = Encoder(layer_size , sen_size , d_model , num_heads , hidden_size , drop_rate , norm_rate , cuda_flag)
        self.de = Decoder(layer_size , sen_size , d_model , num_heads , hidden_size , drop_rate , norm_rate , cuda_flag)

        self.o_layer = nn.Linear(d_model , de_vocabs)

        self.init_param()

    def init_param(self) :

        nn.init.xavier_normal_(self.o_layer.weight)
        nn.init.zeros_(self.o_layer.bias)

    def get_encoder(self) :

        return {'embedding' : self.en_em, 
                'encoding' : self.en_pos, 
                'padding' : self.en_pad,
                'encoder' : self.en}

    def get_decoder(self) :

        return {'embedding' : self.de_em, 
                'encoding' : self.de_pos, 
                'padding' : self.de_pad,
                'encoder' : self.de}

    def get_output(self) :

        return self.o_layer

    def forward(self , en_in_tensor , de_in_tensor) :
        en_pad_tensor = self.en_pad(en_in_tensor) # padding
        en_em_tensor = self.en_em(en_in_tensor) # embedding
        en_pos_tensor = self.en_pos(en_em_tensor) # positional encoding

        de_pad_tensor = self.de_pad(de_in_tensor) # padding
        de_em_tensor = self.de_em(de_in_tensor) # embedding
        de_pos_tensor = self.de_pos(de_em_tensor) # positional encoding

        en_out = self.en(en_pos_tensor , en_pad_tensor) # encoder output
        de_out = self.de(de_pos_tensor , en_out , de_pad_tensor) # deocder output

        pred_tensor = self.o_layer(de_out) # linear layer 
        prob_tensor = F.softmax(pred_tensor , dim = -1) # calcuate probablity

        return prob_tensor

## Model Parameter

In [50]:
en_v_size = en_encoder.get_size()
kor_v_size = kor_encoder.get_size()

layer_size = 6
sen_size = 30
d_model = 512
num_heads = 8
h_size = 2048
drop_rate = 1e-1
norm_rate = 1e-6

In [51]:
transformer = Transformer(layer_size, sen_size, kor_v_size, en_v_size,
                          d_model, num_heads, h_size, drop_rate, norm_rate, use_cuda)

transformer = transformer.to(device)

## Test

In [54]:
class ModelTest(unittest.TestCase): 
    
    def setUp(self) :
        self.dataset = dataset
        self.loader = data_loader
        self.model = transformer
        self.device = device
        
        self.sen_size = sen_size
        self.en_v_size = en_v_size
        self.kor_v_size = kor_v_size
        
        self.opt = optim.SGD(self.model.parameters() , 1e-3)
        self.ce_loss = nn.CrossEntropyLoss().to(self.device)

    def loss_fn(self, y_output , y_label) :
        y_label = y_label.unsqueeze(2)
        y_prob = torch.gather(y_output, -1, y_label)
    
        y_loss = -torch.log(y_prob+1e-30)

        y_loss = torch.mean(y_loss , dim=-1)
        y_loss = torch.mean(y_loss)

        return y_loss
        
    # input type , dtype test
    def test_type(self) :
        sample_data = self.dataset[0]
        sample_en_in = sample_data['encoder_in']
        sample_de_in = sample_data['decoder_in']
        
        element_en_in = str(sample_en_in[0].dtype)
        element_de_in = str(sample_de_in[0].dtype)
   
        self.assertIsInstance(sample_en_in , np.ndarray)
        self.assertIsInstance(sample_de_in , np.ndarray)
        
        self.assertEqual(element_en_in , 'int32')
        self.assertEqual(element_de_in , 'int32')
        
    # input dimension test
    def test_input_dim(self) :
        sample_data = self.dataset[0]
        sample_en_in = sample_data['encoder_in']
        sample_de_in = sample_data['decoder_in']
        
        tensor_en_dim = sample_en_in.shape
        tensor_de_dim = sample_de_in.shape
            
        self.assertEqual(tensor_en_dim[0] , sen_size)
        self.assertEqual(tensor_de_dim[0] , sen_size)
    
    # input array range test
    def test_input_range(self) :
        for data in self.loader :
            break
        
        en_idx = torch.max(data['encoder_in'])
        de_idx = torch.max(data['decoder_in'])
        
        self.assertTrue(en_idx > torch.tensor(0, dtype=torch.int32))
        self.assertTrue(en_idx < torch.tensor(self.kor_v_size, dtype=torch.int32))
        
        self.assertTrue(de_idx > torch.tensor(0, dtype=torch.int32))
        self.assertTrue(de_idx < torch.tensor(self.en_v_size, dtype=torch.int32))
        
    # output dimension test
    def test_output_dim(self) :
        
        for data in self.loader :
            break
            
        en_idx = data['encoder_in'].long().to(device)
        de_idx = data['decoder_in'].long().to(device)
    
        de_out = self.model(en_idx , de_idx)

        output_dim = de_out.shape
        
        self.assertEqual(len(output_dim), 3)
        self.assertEqual(output_dim[1], self.sen_size)
        self.assertEqual(output_dim[2], self.en_v_size)
        
    # training test
    def test_train(self) :
        
        # parameters before training
        prev_param = [copy.deepcopy(m) for m in self.model.parameters()]
 
        self.opt.zero_grad()
            
        for data in self.loader :
            break
            
        en_idx = data['encoder_in'].long().to(device)
        de_idx = data['decoder_in'].long().to(device)
        de_label = data['decoder_out'].long().to(device)
    
        de_out = self.model(en_idx, de_idx)
        
        loss = self.loss_fn(de_out, de_label)
        
        loss.backward()
        self.opt.step()
        
        idx = 0
        
        # check if parameters are updated
        for m in self.model.parameters() :
            
            self.assertFalse(torch.equal(prev_param[idx] , m))
            idx += 1
        


In [55]:
unittest.main(argv=[''], verbosity=2, exit=False)

test_input_dim (__main__.ModelTest) ... ok
test_input_range (__main__.ModelTest) ... ok
test_output_dim (__main__.ModelTest) ... ok
test_train (__main__.ModelTest) ... ok
test_type (__main__.ModelTest) ... ok

----------------------------------------------------------------------
Ran 5 tests in 1.022s

OK


<unittest.main.TestProgram at 0x7f782269ea90>