In [7]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
import matplotlib.pyplot as plt
import seaborn
seaborn.set_context(context="talk")
%matplotlib inline

In [8]:
class Transformer(nn.Module):

  def __init__(self, encoder, decoder):
    super(Transformer, self).__init__()
    self.encoder=encoder
    self.decoder=decoder

  def forward(self,x,z):
    c=self.encoder(x)
    y=self.decoder(z,c)
    return y
    

Encoder는 겹겹 층을 쌓는 구조로 이루어져 있다. Encoder Layer는 input과 output의 형태가 동일하다. 어떤 matrix를 input으로 받는다고 했을 때, Encoder Layer가 도출해내는 output은 input과 동일한 shape이다. 


동일한 크기로 결과를 내는데 왜 층을 쌓는지에 대해서는 Encoder Layer는 input으로 들어오는 vector에 대해서 더 높은 차원에서의 context를 담기 때문에 더 높은 차원의 context라는 것은 더 추상적이다라는 의미 

In [9]:
class Encoder(nn.Module):

  def __init__(self, encoder_layer,n_layer):
    super(Encoder,self).__init__()
    self.layers=[]
    for i in range(n_layer):
      self.layers.append(copy.deepcopy(encoder_layer))


  def forward(self,x):
    out=x
    for layer in self.layers:
      out=layer(out)

    return out



In [10]:
class EncoderLayer(nn.Module):

  def __init__(self,multi_head_attention_layer,position_wise_feed_forward_layer):
    super(EncoderLayer,self).__init__()
    self.multi_head_attention_layer=multi_head_attention_layer
    self.position_wise_feed_forward_layer=position_wise_feed_forward_layer

  def forward(self,x):
    out=self.multi_head_attention_layer(x)
    out=self.position_wise_feed_forward_layer(out)
    return out
  
  

In [11]:
def calculate_attention(self,query,key,value,mask):
  d_k=key.size(-1)
  attention_score=torch.matmul(query,key.transpose(-2,-1))
  attention_score=attention_score/math.sqrt(d_k)

  if mask is not None:
    attention_score=score.masked_fill(mask==0,-1e9)

  attention_prob=F.softmax(score,dim=-1)
  out=torch.matmul(attention_prob,value)

  return out
  

In [18]:
class MultiHeadAttentionLayer(nn.Module):
  def __init__(self,d_model,h,qkv_fc_layer,fc_layer):
    super(MultiHeadAttentionLayer,self).__init__()
    self.d_model=d_model
    self.h=h
    self.query_fc_layer=copy.deepcopy(qkv_fc_layer)
    self.key_fc_layer=copy.deepcopy(qkv_fc_layer)
    self.value_fc_layer=copy.deepcopy(qkv_fc_layer)
    self.fc_layer=fc_layer

  def forward(self,query,key,value,mask=None):
    n_batch=query.shape[0]

    def transform(x,fc_layer):
      out=fc_layer(x)
      out=out.view(n_batch,-1,self.h,self.d_model//self.h)
      out=out.transpose(1,2)
      return out
    
    query=transform(query,self.query_fc_layer)
    key=transform(key,self.key_fc_layer)
    value=transform(value,self.value_fc_layer)

    if mask is not None:
      mask=mask.unsqueeze(1)
    
    out=self.calculate_attention(query,key,value,mask)
    out=out.transpose(1,2)
    out=contiguous().view(n_batch,-1,self.d_model)
    out=self.fc_layer(out)

    return out


In [19]:
def calculate_attention(self,query,key,value,mask):
  d_k=key.size(-1)
  attention_score=torch.matmul(query,key.transpose(-2,-1))
  attention_score=attention_score/math.sqrt(d_k)

  if mask is not None:
    attention_score=score.masked_fill(mask==0,-1e9)
  attention_prob=F.softmax(score,dim=-1)
  out=torch.matmul(attention_prob,value)

  return out

In [20]:
class EncoderLayer(nn.Module):

  def __init__(self,multi_head_attention_layer,position_wise_feed_forward_layer):
    super(EncoderLayer,self).__init__()
    self.multi_head_attention_layer=multi_head_attention_layer
    self.multi_position_wise_feed_forward_layer=position_wise_feed_forward_layer

  def forward(self,x,mask):
    out=self.multi_head_attention_layer(query=x,key=x,value=x,mask=mask)
    out=self.multi_position_wise_feed_forward_layer(out)
    return out

In [21]:
class Encoder(nn.Module):

  def __init__(self,encoder_layer,n_layer):
    super(Encoder,self).__init__()
    self.layers=[]
    for i in range(n_layer):
      self.layers.append(copy.deepcopy(encoder_layer))

  def forward(self,x,mask):
    out=x
    for layer in self.layers:
      out=layer(out,mask)
    return out
    

In [22]:
class Transformer(nn.Module):

	def __init__(self, encoder, decoder):
		super(Transformer, self).__init__()
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, src, trg, mask):
		encoder_output = self.encoder(src, mask)
		out = self.decoder(trg, encoder_output)
		return out

In [23]:
class PositionWiseFeedForwardLayer(nn.Module):
	def __init__(self, first_fc_layer, second_fc_layer):
		self.first_fc_layer = first_fc_layer
		self.second_fc_layer = second_fc_layer
	
	def forward(self, x):
		out = self.first_fc_layer(x)
		out = F.relu(out)
		out = self.dropout(out)
		out = self.second_fc_layer(out)
		return out

In [24]:
class ResidualConnectionLayer(nn.Module):
	def __init__(self, norm_layer):
		super(ResidualConnectionLayer, self).__init__()
		self.norm_layer = norm_layer

	def forward(self, x, sub_layer):
		out = sub_layer(x) + x
		out = self.norm_layer(out)
		return out

In [25]:
class EncoderLayer(nn.Module):

	def __init__(self, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
		super(EncoderLayer, self).__init__()
		self.multi_head_attention_layer = multi_head_attention_layer
		self.position_wise_feed_forward_layer = position_wise_feed_forward_layer
		self.residual_connection_layers = [ResidualConnectionLayer(copy.deepcopy(norm_layer)) for i in range(2)]

	def forward(self, x, mask):
		out = self.residual_connection_layers[0](x, lambda x: self.multi_head_attention_layer(x, x, x, mask))
		out = self.residual_connection_layers[1](x, lambda x: self.position_wise_feed_forward_layer(x))
		return out

In [26]:
def subsequent_mask(size):
	atten_shape = (1, size, size)
	mask = np.triu(np.ones(atatn_shape), k=1).astype('uint8') # masking with upper triangle matrix
	return torch.from_numpy(mask)==0 # reverse (masking=False, non-masking=True)

def make_std_mask(tgt, pad):
	tgt_mask = (tgt != pad) # pad masking
	tgt_mask = tgt_mask.unsqueeze(-2) # reshape (n_batch, seq_len) -> (n_batch, 1, seq_len)
	tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # pad_masking & subsequent_masking
	return tgt_mask

In [27]:
class Transformer(nn.Module):

	def __init__(self, encoder, decoder):
		super(Transformer, self).__init__()
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, src, trg, src_mask, trg_mask):
		encoder_output = self.encoder(src, src_mask)
		out = self.decoder(trg, trg_mask, encoder_output)
		return out

In [28]:
class Decoder(nn.Module):
	def __init__(self, sub_layer, n_layer):
		super(Decoder, self).__init__()
		self.layers = []
		for i in range(n_layer):
			self.layers.append(copy.deepcopy(sub_layer))

	def forward(self, x, mask, encoder_output, encoder_mask):
		out = x
		for layer in self.layers:
			out = layer(out, mask, encoder_output, encoder_mask)
		return out

In [29]:
class DecoderLayer(nn.Module):
	def __init__(self, masked_multi_head_attention_layer, multi_head_attention_layer, position_wise_feed_forward_layer, norm_layer):
		super(DecoderLayer, self).__init__()
		self.masked_multi_head_attention_layer = ResidualConnectionLayer(masked_multi_head_attention_layer, copy.deepcopy(norm_layer))
		self.multi_head_attention_layer = ResidualConnectionLayer(multi_head_attention_layer, copy.deepcopy(norm_layer))
		self.position_wise_feed_forward_layer = ResidualConnectionLayer(position_wise_feed_forward_layer, copy.deepcopy(norm_layer))

	def forward(self, x, mask, encoder_output, encoder_mask):
		out = self.masked_multi_head_attention_layer(query=x, key=x, value=x, mask=mask)
		out = self.multi_head_attention_layer(query=out, key=encoder_output, value=encoder_output, mask=encoder_mask)
		out = self.position_wise_feed_forward_layer(x=out)
		return out

In [30]:
class Transformer(nn.Module):

	def __init__(self, encoder, decoder):
		super(Transformer, self).__init__()
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, src, trg, src_mask, trg_mask):
		encoder_output = self.encoder(src, src_mask)
		out = self.decoder(trg, trg_mask, encoder_output, src_mask)
		return out

In [31]:
class TransformerEmbedding(nn.Module):
	def __init__(self, embedding, positional_encoding):
		super(TransformerEmbedding, self).__init__()
		self.embedding = nn.Sequential(embedding, positional_encoding)

	def forward(self, x):
		out = self.embedding(x)
		return out

In [32]:
class Embedding(nn.Module):
	def __init__(self, d_embed, vocab):
		super(Embedding, self).__init__()
		self.embedding = nn.Embedding(len(vocab), d_embed)
		self.vocab = vocab
		self.d_embed = d_embed

	def forward(self, x):
		out = self.embedding(x) * math.sqrt(self.d_embed)
		return out

In [33]:
class PositionalEncoding(nn.Module):
	def __init__(self, d_embed, max_seq_len=5000):
		super(PositionalEncoding, self).__init__()
		encoding = torch.zeros(max_seq_len, d_embed)
		position = torch.arange(0, max_seq_len).unsqueeze(1)
		div_term = torch.exp(torch.arange(0, d_embed, 2) * -(math.log(10000.0) / d_embed))
		encoding[:, 0::2] = torch.sin(position * div_term)
		encoding[:, 1::2] = torch.cos(position * div_term)
		self.encoding = encoding
	
	def forward(self, x):
		out = x + Variable(self.encoding[:, :x.size(1)], requires_grad=False)
		out = self.dropout(out)
		return out

In [34]:
class Transformer(nn.Module):

	def __init__(self, src_embed, trg_embed, encoder, decoder):
		super(Transformer, self).__init__()
		self.src_embed = src_embed
		self.trg_embed = trg_embed
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, src, trg, src_mask, trg_mask):
		encoder_output = self.encoder(self.src_embed(src), src_mask)
		out = self.decoder(self.trg_embed(trg), trg_mask, encoder_output, src_mask)
		return out

In [35]:
class Transformer(nn.Module):

	def __init__(self, src_embed, trg_embed, encoder, decoder, fc_layer):
		super(Transformer, self).__init__()
		self.src_embed = src_embed
		self.trg_embed = trg_embed
		self.encoder = encoder
		self.decoder = decoder
		self.fc_layer = fc_layer

	def forward(self, src, trg, src_mask, trg_mask):
		encoder_output = self.encoder(self.src_embed(src), src_mask)
		out = self.decoder(self.trg_embed(trg), trg_mask, encoder_output, src_mask)
		out = self.fc_layer(out)
		out = F.log_softmax(out, dim=-1)
		return out

In [36]:
def make_model(
    src_vocab, 
    trg_vocab, 
    d_embed = 512, 
    n_layer = 6, 
    d_model = 512, 
    h = 8, 
    d_ff = 2048):

    cp = lambda x: copy.deepcopy(x)

    # multi_head_attention_layer 생성한 뒤 copy해 사용
    multi_head_attention_layer = MultiHeadAttentionLayer(
                                    d_model = d_model,
                                    h = h,
                                    qkv_fc_layer = nn.Linear(d_embed, d_model),
                                    fc_layer = nn.Linear(d_model, d_embed))

    # position_wise_feed_forward_layer 생성한 뒤 copy해 사용    
    position_wise_feed_forward_layer = PositionWiseFeedForwardLayer(
                                        first_fc_layer = nn.Linear(d_embed, d_ff),
                                        second_fc_layer = nn.Linear(d_ff, d_embed))
    
    # norm_layer 생성한 뒤 copy해 사용
    norm_layer = nn.LayerNorm(d_embed, eps=1e-6)

    # 실제 model 생성
    model = Transformer(
                src_embed = TransformerEmbedding(    # SRC embedding 생성
                                embedding = Embedding(
                                                d_embed = d_embed, 
                                                vocab = src_vocab), 
                                positional_encoding = PositionalEncoding(
                                                d_embed = d_embed)), 
	
                trg_embed = TransformerEmbedding(    # TRG embedding 생성
                                embedding = Embedding(
                                                d_embed = d_embed, 
                                                vocab = trg_vocab), 
                                positional_encoding = PositionalEncoding(
                                                d_embed = d_embed)),
                encoder = Encoder(                    # Encoder 생성
                                sub_layer = EncoderLayer(
                                                multi_head_attention_layer = cp(multi_head_attention_layer),
                                                position_wise_feed_forward_layer = cp(position_wise_feed_forward_layer),
                                                norm_layer = cp(norm_layer)),
                                n_layer = n_layer),
                decoder = Decoder(                    # Decoder 생성
                                sub_layer = DecoderLayer(
                                                masked_multi_head_attention_layer = cp(multi_head_attention_layer),
                                                multi_head_attention_layer = cp(multi_head_attention_layer),
                                                position_wise_feed_forward_layer = cp(position_wise_feed_forward_layer),
                                                norm_layer = cp(norm_layer)),
                                n_layer = n_layer),
                fc_layer = nn.Linear(d_model, len(trg_vocab)))    # Generator의 FC Layer 생성
    
    return model