In [1]:
# Packages
import torch
import torch.nn as nn
import torch.optim as optim

import torchtext
from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

import spacy
import numpy as np

import random
import math
import time

In [2]:
# ! python -m spacy download de

In [3]:
# ! python -m spacy download en

In [4]:
# Create the tokenizers
spacy_de=spacy.load('de')
spacy_en=spacy.load('en')

In [5]:
def tokenize_de(text):
    """ Tokenizes german text from a string into a list of strings """
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    """ Tokenizes english text from a string into a list of strings """
    return [tok.text for tok in spacy_en.tokenizer(text)]

In [6]:
# The model expects data to be fed in with batch dimension first, so we use batch_first=True

In [7]:
SRC=Field(tokenize=tokenize_de, init_token='<sos>', eos_token='<eos>', lower=True, batch_first=True)

TRG=Field(tokenize=tokenize_en, init_token='<sos>', eos_token='<sos>', lower=True, batch_first=True)

In [8]:
# We load the Multi30k dataset and build the vocabulary

In [9]:
train_data, valid_data, test_data=Multi30k.splits(exts=('.de', '.en'), fields=(SRC, TRG))

In [10]:
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

In [11]:
# Finally we define the device and the data iterator

In [12]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [13]:
BATCH_SIZE=64

# train_iterator, valid_iterator, test_iterator=BucketIterator.splits((train_data, valid_data, test_data), batch_sizes=BATCH_SIZE, device=device)
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data), 
     batch_size = BATCH_SIZE,
     device = device)

# Building the model

## Encoder

In [14]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hid_dim, n_layers, n_heads, pf_dim, dropout, device, max_length=100):
        super.__init()
        
        self.device=device
        self.tok_embedding=nn.Embedding(input_dim, output_dim)
        self.pos_embedding=nn.Embedding(max_length, hid_dim)
        
        self.layers=nn.ModuleList([EncoderLayer(hid_dim, n_heads, pf_dim, dropout, device) for _ in range(n_layers)])
        
        self.dropout=nn.Dropout(dropout)
        
        self.scale=torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
        
    def forward(self, src, src_mask):
        # src= [batch_size, src_len]
        # src_mask=[batch_size, src_len]
        
        batch_size=src.shape[0]
        src_len=src.shape[1]
        
        pos=torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        
        # pos=[batch_size, src_len]
        
        src=self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
        
        # src=[batch_size, src_len, hid_dim]
        
        for layer in self.layers:
            src=layer(src, src_mask)
        
        # src=[batch_size, src_len, hid_dim]
        
        return src

## Encoder Layer

In [15]:
class EncoderLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
        super().__init__()
        self.layer_norm=nn.LayerNorm(hid_dim)
        self.self_attention=MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.positionwise_feedforward=PositionwiseFeedforwardLayer(hid_dim, pf_dim, dropout)
        self.dropout=nn.Dropout(dropout)
        
    def forward(self, src, src_mask):
        # src = [batch_size, src_len, hid_dim]
        # src_mask = [batch_size, src_len]

        # self attention
        _src, _ = self.self_attention(src, src, src, src_mask)

        #dropout, residual connection and layer norm
        src=self.layer_norm(src+self.dropout(_src))
        # src = [batch_size, src_len, hid_dim]

        # positionwise feedforward
        _src = self.positionwise_feedforward(src)

        # dropout, residual connection and layer norm
        src=self.layer_norm(src + self.dropout(_src))
        # src = [batch_size, src_len, hid_dim]

        return src

## MultiHead Attention Layer

In [17]:
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, dropout, device):
        super().__init__()
        
        assert hid_dim%n_heads==0
        
        self.hid_dim=hid_dim
        self.n_heads=n_heads
        self.head_dim=hid_dim//n_heads
        
        self.fc_q=nn.Linear(hid_dim, hid_dim)
        self.fc_k=nn.Linear(hid_dim, hid_dim)
        self.fc_v=nn.Linear(hid_dim, hid_dim)
        
        self.fc_o=nn.Linear(hid_dim, hid_dim)
        
        self.dropout=nn.Dropout(dropout)
        
        self.scale=torch.sqrt(torch.FloatTensor([self.hid_dim])).to(device)
        
    def forward(self, query, key, value, mask=None):

        batch_size=query.shape[0]

        # query = [batch_size, query_len, hid_dim]
        # key = [batch_size, key_len, hid_dim]
        # value = [batch_size, value_len, hid_dim]

        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        # Q = [batch_size, n_heads, query_len, hid_dim]
        # K = [batch_size, n_heads, key_len, hid_dim]
        # V = [batch_size, n_heads, value_len, hid_dim]

        energy=torch.matmul(Q, K.permute(0, 1, 3, 2))/self.scale
        # energy = [batch_size, n_heads, seq_len, seq_len]

        if mask is not None:
            energy = energy.masked_fill(mask==0, 1e-10)

        attention = torch.softmax(energy, dim=-1)
        # attention = [batch_size, n_heads, query_len, key_len]

        x = torch.matmul(self.dropout(attention), V)
        # x = [batch_size, n_heads, seq_len, head_dim]

        x = x.permute(0, 2, 1, 3).contiguous()
        # x = [batch_size, seq_len, n_heads, head_dim]

        x = x.view(batch_size, -1, self.hid_dim)
        # x = [batch_size, seq_len, hid_dim]

        x=self.fc_o(x)
        # x = [batch_size, seq_len, hid_dim]

        return x, attention

## Position-wise Feedforward Layer

In [25]:
class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, hid_dim, pf_dim, dropout):
        super().__init__()
        
        self.fc_1 = nn.Linear(hid_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hid_dim)
        
        self.dropout=nn.Dropout(dropout)
        
    def forward(self, x):
        # x = [batch_size, seq_len, hid_dim]

        x=self.dropout(torch.relu(self.fc_1(x)))
        # x=[batch_size, seq_len, pf_dim]

        x=self.fc_2(x)
        # x=[batch_size, seq_len, hid_dim]

        return x

## Decoder

In [26]:
class Decoder(nn.Module):
    def __init__(self, output_dim, hid_dim, n_layers, n_heads, pf_dim, dropout, device, max_length=100):
        super().__init__()
        
        self.device=device
        
        self.tok_embedding=nn.Embedding(output_dim, hid_dim)
        self.pos_embedding=nn.Embedding(max_length, hid_dim)
        
        self.layers=nn.ModuleList([DecoderLayer(hid_dim, n_heads, pf_dim, dropout, device) for _ in range(n_layers)])
        
        self.fc_out=nn.Linear(hid_dim, dim_out)
        
        self.dropout=nn.Dropout(dropout)
        
        self.scale=torch.sqrt(torch.FloatTensor([hid_dim])).to(device)
        
    def forward(self, trg, enc_src, trg_mask, src_mask):
        # trg=[batch_size, trg_len]
        # enc_src=[batch_size, src_len, hid_dim]
        # trg_mask=[batch_size, trg_len]
        # src_mask=[batch_size, src_len]
        
        batch_size=trg.shape[0]
        trg_len=trg.shape[1]
        
        pos=pos.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        #pos=[batch_size, trg_len]
        
        trg=self.dropout((self.tok_embedding(trg)*self.scale)+self.pos_embedding(pos))
        # trg=[batch_size, trg_len, hid_dim]
        
        for layer in self.layers:
            trg, attention=layer(trg, enc_src, trg_mask, src_mask)
            
        # trg=[batch_size, trg_len, hid_dim]
        # attention=[batch_size, n_heads, trg_len, src_len]
        
        output=self.fc_out(trg)
        # output=[batch_size, trg_len, output_dim]
        
        return output, attention

## Decoder Layer

In [27]:
class DecoderLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
        super().__init__()
        
        self.layer_norm=nn.LayerNorm(hid_dim)
        self.self_attention=MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.encoder_attention=MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        self.positionwise_feedforward=PositionWiseFeedforwardLayer(hid_dim, pf_dim, dropout)
        self.dropout=nn.Dropout(dropout)
        
    def forward(self, trg, enc_src, trg_mask, src_mask):
        # trg=[batch_size, trg_len, hid_dim]
        # enc_src=[batch_size, src_len, hid_dim]
        # trg_mask=[batch_size, trg_len]
        # src_mask=[batch_size, src_len]
        
        # self attention
        _trg, _=self.self_attention(trg, trg, trg, trg_mask)
        
        # dropout, residual connection and layer norm
        trg=self.layer_norm(trg+self.dropout(_trg))
        # trg=[batch_size, trg_len, hid_dim]
        
        # encoder attention
        _trg, attention=self.encoder_attention(trg, enc_src, enc_src, src_mask)
        
        # dropout, residual connection and layer norm
        trg=self.layer_norm(trg+self.dropout(_trg))
        
        # trg=[batch_size, trg_len, hid_dim]
        
        # positionwise_feedforward
        _trg=self.positionwise_feedforward(trg)
        
        # dropout, residual connection and layer_norm
        trg=self.layer_norm(trg+self.dropout(_trg))
        
        # trg=[batch_size, trg_len, hid_dim]
        # attention=[batch_size, n_heads, trg_len, src_len]
        
        return trg, attention

## Seq2Seq

In [42]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()
        self.encoder=encoder
        self.decoder=decoder
        self.src_pad_idx=src_pad_idx
        self.trg_pad_idx=trg_pad_idx
        self.device=device
    
    def make_src_mask(self, src):
        # src=[batch_size, src_len]
        
        src_mask=(src!=self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # src_mask=[batch_size, 1, 1, src_len]
        
        return src_mask
    
    def make_trg_mask(self, trg):
        #trg=[batch_size, trg_len]
        
        trg_pad_mask=(trg!=self.trg_pad_idx).unsqueeze(1).unsqueeze(3)
        # trg_pad_mask=[batch_size, 1, trg_len, 1]
        
        trg_len=trg.shape[1]
        
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool()
        # trg_mask=[trg_len, trg_len]
        
        trg_mask=trg_pad_mask & trg_sub_mask
        # trg_mask=[batch_size, 1, trg_len, trg_len]
        
        return trg_mask
    
    def forward(self, src, trg):
        # src=[batch_size, src_len]
        # trg=[batch_size, trg_len]
        
        src_mask=self.make_src_mask(src)
        trg_mask=self.make_trg_mask(trg)
        # src_mask=[batch_size, 1, 1, src_len]
        # trg_mask=[batch_size, 1, trg_len, trg_len]
        
        enc_src=self.encoder(src, src_mask)
        # enc_src=[batch_size, src_len, hid_dim]
        
        output