In [3]:
import torch
from torch import nn as nn
import copy

In [27]:
class Transformer(nn.Module):
    def __init__(self, src_embed,trg_embed, encoder, decoder, fc_layer):
        super(Transformer, self).__init__()
        self.src_embed = src_embed
        self.trg_embed = trg_embed
        self.encoder = encoder
        self.decoder = decoder
        self.fc_layer = fc_layer
    
    def forward(self, src, trg, src_mask, trg_mask):
        encoder_output = self.encoder(self.src_embed(src), src_mask)
        out = self.decoder(self.trg_embed(trg), trg_mask, encoder_output, src_mask)
        out = self.fc_layer(out)
        out = F.log_softmax(out, dim = -1)

        return out


In [5]:
class Encoder(nn.Module):
    def __init__(self, encoder_layer, n_layer):
        super(Encoder,self).__init__()
        self.layers = []
        for i in range(n_layer):
            self.layer.append(copy.deepcopy(encoder_layer))
        
    def forward(self, x, mask):
        out = x
        for layer in self.layers:
            out = layer(out, mask)
        return out

In [13]:
class EncoderLayer(nn.Module):

    def __init__(self, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
        super(EncoderLayer, self).__init__()
        self.multi_head_attention_layer = multi_head_attention_layer
        self.position_wise_feed_forward_layer = position_wise_feed_forward_layer
        self.residual_connection_layers = [ResidualConnectionLayer(copy.deepcopy(norm_layer)) for i in range(2)]
    
    def forward(self, x, mask):
        out = self.residual_connection_layers[0](x, lambda x: self.multi_head_attention_layer(query = x, key = x, value = x, mask =mask))
        out = self.residual_connection_layers[1](x, lambda x : self.position_wise_feed_forward_layer(out))

        return out


In [7]:
import numpy as np
from torch.nn import functional as F
import math
import pandas as pd

In [8]:
def calculate_attention(self, query, key, value, mask):
    d_k = key.size(-1)
    attention_score = torch.matmul(query, key.transpose(-2,-1))
    attention_score = attention_score / math.sqrt(d_k)
    if mask is not None:
        attention_score = attention_score.masked_fill(mask==0, -1e9)
    attention_prob = F.softmax(attention_score, dim=1)
    out = torch.matmul(attention_prob, value)
    return out

In [9]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, d_model, h, qkv_fc_layer, fc_layer):
        super(MultiHeadAttentionLayer, self).__init__()
        self.d_model = d_model
        self.h = h
        self.query_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.key_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.value_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.fc_layer = fc_layer

    def forward(self, query, key, value, mask=None):
        n_batch = query.shape[0]

        def transform(x, fc_layer):
            out = fc_layer(x)
            out = out.view(n_batch, -1, self.h, self.d_model//self.h)
            out = out.transpose(1,2)
            return out
    
        query = transform(query, self.query_fc_layer)
        key = transform(key, self.key_fc_layer)
        value = transform(value, self.value_fc_layer)

        if mask is not None:
            mask = mask.unsqueeze(1)

        out = self.calculate_attention(query, key, value, mask)
        out = out.transpose(1,2)
        out = out.contigous().view(n_batch, -1, self.d_model)
        out = self.fc_layer(out)
        return out
            

In [10]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, d_model, h, qkv_fc_layer, fc_layer):
        super(MultiHeadAttentionLayer, self).__init__()
        self.d_model = d_model
        self.h = h
        self.query_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.key_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.value_fc_layer = copy.deepcopy(qkv_fc_layer)
        self.fc_layer = fc_layer

    def forward(self, query, key, value, mask=None):
        n_batch = query.shape[0]

        def transform(x, fc_layer):
            out = fc_layer(x)
            out = out.view(n_batch, -1, self.h, self.d_model//self.h)
            out = out.transpose(1,2)
            return out

        query = transform(query, self.query_fc_layer)
        key = transform(key, self.key_fc_layer)
        value = transform(value, self.value_fc_layer)

        if mask is not None:
            mask = mask.unsqueeze(1)
        
        out = self.calculate_attention(query, key, value, mask)
        out = out.transpose(1,2)
        out = out.contiguous().view(n_batch, -1, self.d_model)
        out = self.fc_layer(out)
        return out



In [11]:
class PositionWiseFeedForwadLayer(nn.Module):
    def __init__(self, first_fc_layer, second_fc_layer):
        super(PositionWiseFeedForwadLayer, self).__init__()
        self.first_fc_layer = first_fc_layer
        self.second_fc_layer = second_fc_layer

    def forward(self, x):
        out = self.first_fc_layer(x)
        out = F.relu(out)
        out = self.second_fc_layer(out)
        out = F.relu(out)

        return out

In [12]:
class ResidualConnectionLayer(nn.Module):
    def __init__(self, norm_layer):
        super(ResidualConnectionLayer, self).__init__()
        self.norm_layer = norm_layer
    
    def forward(self, x, sub_layer):
        out = sub_layer(x) + x
        out = self.norm_layer(out)
        return out

In [15]:
from torch.autograd import Variable

In [16]:
def subsequent_mask(size):
    atten_shape = (1, size, size)
    mask = np.triu(np.ones(atten_shape), k=1).astype('unit8')
    return torch.from_numpy(mask) == 0

def make_std_mask(tgt, pad):
    tgt_mask = (tgt != pad)
    tgt_mask = tgt_mask.unsqueeze(-2)
    tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data))
    return tgt_mask

In [19]:
class Decoder(nn.Module):
    def __init__(self, sub_layer, n_layer):
        super(Decoder,self).__init__()
        self.layers = []
        for i in range(n_layer):
            self.layers.append(copy.deepcopy(sub_layer))
    
    def forward(self, x, mask, encoder_output, encoder_mask):
        output = x
        for layer in self.layers:
            out = layer(out, mask, encoder_output, encoder_mask)
        
        return out

In [20]:
class DecoderLayer(nn.Module):
    def __init__(self, masked_multi_head_attention_layer, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
        super(DecoderLayer, self).__init__()
        self.masked_multi_head_attention_layer = ResidualConnectionLayer(masked_multi_head_attention_layer, copy.deepcopy(norm_layer))
        self.multi_head_attention_layer = ResidualConnectionLayer(multi_head_attention_layer, copy.deepcopy(norm_layer))
        self.position_wise_feed_forward_layer = ResidualConnectionLayer(position_wise_feed_forward_layer, copy.deepcopy(norm_layer))

    def forward(self, x, mask, encoder_output, encoder_mask):
        out = self.masked_multi_head_attention_layer(query = x, key = x, value = x, mask = mask)
        out = self.multi_head_attention_layer(query = out, key = encoder_output, value = encoder_output, mask = encoder_mask)
        out = self.position_wise_feed_forward_layer(x = out)
        return out

In [22]:
class TransformerEmbedding(nn.Module):
    def __init__(self, embedding, positional_embedding):
        super(TransformerEmbedding).__init__()
        self.embedding = nn.Sequential(embedding, positional_embedding)
    
    def forward(self, x):
        out = self.embedding(x)
        return out

In [23]:
class Embedding(nn.Module):
    def __init__(self, d_embed, vocab):
        super(Embedding, self).__init__()
        self.embedding = nn.Embedding(len(vocab), d_embed)
        self.vocab = vocab
        self.d_embed = d_embed
    
    def forward(self, x):
        out = self.embedding(x) * math.sqrt(self.d_embed)

        return out

In [24]:
class PositionalEmbedding(nn.Module):
    def __init__(self, d_embed, max_seq_len = 5000):
        super(PositionalEmbedding,self).__init__()
        encoding = torch.zeros(max_seq_len, d_embed)
        position = torch.arange(0, max_seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_embed, 2)* - (math.log(10000.0) / d_embed))
        encoding[:, 0::2] = torch.sin(position * div_term)
        encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = encoding
    
    def forward(self, x):
        out = x + Variable(self.encoding[:, :x.size(1)], requires_grad =False)
        out = self.dropout(out)
        return out

In [29]:
def make_model(src_vocab, trg_vocab, d_embed = 512, n_layer = 6, d_model = 512, h = 8, d_ff = 2048):
    cp = lambda x: copy.deepcopy(x)
    
    multi_head_attention_layer = MultiHeadAttentionLayer(d_model = d_model, h = h, qkv_fc_layer= nn.Linaer(d_embed, d_model), fc_layer = nn.Linear(d_model, d_embed))
    
    position_wise_feed_forward_layer = PositionWiseFeedForwadLayer(first_fc_layer= nn.Linear(d_embed, d_ff), second_fc_layer= nn.Linear(d_ff, d_embed))

    norm_layer = nn.LayerNorm(d_embed, eps = 1e-6)

    model = Transformer(src_embed = TransformerEmbedding(embedding= Embedding(d_embed=d_embed, vocab=src_vocab), positional_embedding = PositionalEmbedding(d_embed=d_embed)),
    trg_embed= TransformerEmbedding(embedding= Embedding(d_embed=d_embed, vocab=trg_vocab),positional_embedding=PositionalEmbedding(d_embed=d_embed)),
    encoder= Encoder(encoder_layer = EncoderLayer(multi_head_attention_layer= cp(multi_head_attention_layer), position_wise_feed_forward_layer=cp(position_wise_feed_forward_layer), norm_layer=cp(norm_layer))),
    decoder= Decoder(sub_layer= DecoderLayer(masked_multi_head_attention_layer= cp(multi_head_attention_layer), multi_head_attention_layer=cp(multi_head_attention_layer), position_wise_feed_forward_layer=cp(position_wise_feed_forward_layer),norm_layer=cp(norm_layer))),
    n_layer = n_layer,
    fc_layer= nn.Linear(d_model, len(trg_vocab)))
    return model
