In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator

import spacy
import random
import math
import time

In [None]:
# Set random seeds for reproducibility
SEED = 1234

random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:

# Load Spacy models for tokenization
spacy_en = spacy.load('en')
spacy_de = spacy.load('de')

# Tokenization functions
def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]

In [None]:
# Define Fields for data preprocessing
SRC = Field(tokenize=tokenize_de, init_token='<sos>', eos_token='<eos>', lower=True)
TRG = Field(tokenize=tokenize_en, init_token='<sos>', eos_token='<eos>', lower=True)

# Load Multi30k dataset and split into train/validation/test
train_data, valid_data, test_data = Multi30k.splits(exts=('.de', '.en'), fields=(SRC, TRG))

# Build vocabulary from training set
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

In [None]:
# Define device for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define hyperparameters
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
ENC_HID_DIM = 512
DEC_HID_DIM = 512
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
N_LAYERS = 10
KERNEL_SIZE = 3
CLIP = 1

In [None]:
# Define encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.input_dim = input_dim
        self.emb_dim = emb_dim
        self.hid_dim = hid_dim
        self.dropout = dropout
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.conv_layers = nn.ModuleList([nn.Conv1d(emb_dim, hid_dim, KERNEL_SIZE, padding=(KERNEL_SIZE-1)//2) for _ in range(N_LAYERS)])
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input):
        # input: [src_len, batch_size]
        embedded = self.embedding(input)
        # embedded: [src_len, batch_size, emb_dim]
        embedded = embedded.transpose(0, 1)
        # embedded: [batch_size, src_len, emb_dim]
        embedded = embedded.transpose(1, 2)
        # embedded: [batch_size, emb_dim, src_len]
        for conv_layer in self.conv_layers:
            conv_output = conv_layer(embedded)
            conv_output = F.relu(conv_output)
            conv_output = F.max_pool1d(conv_output, kernel_size=conv_output.shape[2])
            embedded = conv_output
        # embedded: [batch_size, hid_dim, 1]
        embedded = embedded.squeeze(2)
        embedded = self.dropout(embedded)
        # embedded: [batch_size, src_len, emb_dim]
        return embedded

# Define decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.emb_dim = emb_dim
        self.hid_dim = hid_dim
        self.dropout = dropout
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.conv_layers = nn.ModuleList([nn.Conv1d(emb_dim, hid_dim, KERNEL_SIZE, padding=(KERNEL_SIZE-1)//2) for _ in range(N_LAYERS)])
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, encoder_output):
        # input: [batch_size]
        # encoder_output: [batch_size, src_len, emb_dim]
        input = input.unsqueeze(1)
        # input: [batch_size, 1]
        embedded = self.embedding(input)
        # embedded: [batch_size, 1, emb_dim]
        embedded = embedded.transpose(1, 2)
        # embedded: [batch_size, emb_dim, 1]
        for conv_layer in self.conv_layers:
            conv_output = conv_layer(embedded)
            conv_output = F.glu(conv_output, dim=1)
            embedded = (conv_output + embedded) * math.sqrt(0.5)
        embedded = embedded.transpose(1, 2)
        # embedded: [batch_size, 1, hid_dim]
        output = self.fc_out(embedded.squeeze(1))
        # output: [batch_size, output_dim]
        return output


In [None]:
class Seq2Seq(nn.Module):
  def init(self, encoder, decoder, device):
    super().init()
    self.encoder = encoder
    self.decoder = decoder
    self.device = device
  def forward(self, src, trg, teacher_forcing_ratio=0.5):
    # src: [src_len, batch_size]
    # trg: [trg_len, batch_size]
    # teacher_forcing_ratio: probability of using teacher forcing
    batch_size = trg.shape[1]
    max_len = trg.shape[0]
    trg_vocab_size = self.decoder.output_dim
    outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)
    encoder_output = self.encoder(src)
    # input to the decoder
    input = trg[0,:]
    for t in range(1, max_len):
        output = self.decoder(input, encoder_output)
        outputs[t] = output
        teacher_force = random.random() < teacher_forcing_ratio
        top1 = output.argmax(1)
        input = trg[t] if teacher_force else top1
    return outputs

In [None]:
def train(model, iterator, optimizer, criterion, clip):
  model.train()
  epoch_loss = 0
  for i, batch in enumerate(iterator):
    src = batch.src
    trg = batch.trg
    optimizer.zero_grad()
    output = model(src, trg)
    # output: [trg_len, batch_size, output_dim]
    output = output[1:].view(-1, output.shape[-1])
    trg = trg[1:].view(-1)
    # trg: [(trg_len-1) * batch_size]
    loss = criterion(output, trg)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    optimizer.step()
    epoch_loss += loss.item()