# Chapter 7: Text Translation Using Sequence-to-Sequence Neural Network (Notes)

- seq to seq modeling: takes in one sentence in one language and outputs the translation in another language
- using RNN as part of a larger, more complex model to perform sequence to sequence translatin (basically extending RNNs)

# Code

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import spacy
import numpy as np
import random
import math
import time
from torchtext.datasets import TranslationDataset, Multi30k
from torchtext.data import Field, BucketIterator

In [None]:
from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from typing import Iterable, List

def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

# Define source and target language
SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'

# Place holder for tokens and vocabulary
token_transform = {}
vocab_transform = {}

# Create source and target language tokenizer
token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en')

# Training data Iterator 
train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))

# Create torchtext's Vocab object 
vocab_transform[SRC_LANGUAGE] = build_vocab_from_iterator(yield_tokens(train_iter, SRC_LANGUAGE))
vocab_transform[TGT_LANGUAGE] = build_vocab_from_iterator(yield_tokens(train_iter, TGT_LANGUAGE))


In [None]:
spacy_german = spacy.load('de')
spacy_english = spacy.load('en')

In [None]:
def tokenize_german(text):
    return [token.text for token in spacy_german.tokenizer(text)]
def tokenize_english(text):
    return [token.text for token in spacy_english.tokenizer(text)][::-1]

In [None]:
SOURCE = Field(tokenize = tokenize_english, 
            init_token = '<sos>', 
            eos_token = '<eos>', 
            lower = True)

TARGET = Field(tokenize = tokenize_german, 
            init_token = '<sos>', 
            eos_token = '<eos>', 
            lower = True)

In [None]:

train_data, valid_data, test_data = Multi30k.splits(exts = ('.en', '.de'), 
                                                    fields = (SOURCE, TARGET))

In [None]:

print(train_data.examples[0].src)
print(train_data.examples[0].trg)

In [None]:

print("Training dataset size: " + str(len(train_data.examples)))
print("Validation dataset size: " + str(len(valid_data.examples)))
print("Test dataset size: " + str(len(test_data.examples)))

In [None]:

SOURCE.build_vocab(train_data, min_freq = 2)
TARGET.build_vocab(train_data, min_freq = 2)

print("English (Source) Vocabulary Size: " + str(len(SOURCE.vocab)))
print("German (Target) Vocabulary Size: " + str(len(TARGET.vocab)))

In [None]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 32

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = batch_size, 
    device = device)

In [None]:

class Encoder(nn.Module):
    def __init__(self, input_dims, emb_dims, hid_dims, n_layers, dropout):
        super().__init__()
        
        self.hid_dims = hid_dims
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(input_dims, emb_dims)
        
        self.rnn = nn.LSTM(emb_dims, hid_dims, n_layers, dropout = dropout)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        
        embedded = self.dropout(self.embedding(src))
        
        outputs, (h, cell) = self.rnn(embedded)
        
        return h, cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dims, emb_dims, hid_dims, n_layers, dropout):
        super().__init__()
        
        self.output_dims = output_dims
        self.hid_dims = hid_dims
        self.n_layers = n_layers
        
        self.embedding = nn.Embedding(output_dims, emb_dims)
        
        self.rnn = nn.LSTM(emb_dims, hid_dims, n_layers, dropout = dropout)
        
        self.fc_out = nn.Linear(hid_dims, output_dims)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, h, cell):
                
        input = input.unsqueeze(0)
                
        embedded = self.dropout(self.embedding(input))
                
        output, (h, cell) = self.rnn(embedded, (h, cell))
        
        pred = self.fc_out(output.squeeze(0))
        
        return pred, h, cell

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_rate = 0.5):
        
        batch_size = trg.shape[1]
        target_length = trg.shape[0]
        target_vocab_size = self.decoder.output_dims
        
        outputs = torch.zeros(target_length, batch_size, target_vocab_size).to(self.device)
        
        h, cell = self.encoder(src)
        
        input = trg[0,:]
        
        for t in range(1, target_length):

            output, h, cell = self.decoder(input, h, cell)
            
            outputs[t] = output
            
            top = output.argmax(1) 
        
            input = trg[t] if (random.random() < teacher_forcing_rate) else top
        
        return outputs

In [None]:

input_dimensions = len(SOURCE.vocab)
output_dimensions = len(TARGET.vocab)
encoder_embedding_dimensions = 256
decoder_embedding_dimensions = 256
hidden_layer_dimensions = 512
number_of_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5

encod = Encoder(input_dimensions, encoder_embedding_dimensions,\
              hidden_layer_dimensions, number_of_layers, encoder_dropout)
decod = Decoder(output_dimensions, decoder_embedding_dimensions,\
              hidden_layer_dimensions, number_of_layers, decoder_dropout)

model = Seq2Seq(encod, decod, device).to(device)

In [None]:

def initialize_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.1, 0.1)
        
model.apply(initialize_weights)

In [None]:

optimizer = optim.Adam(model.parameters())

criterion = nn.CrossEntropyLoss(ignore_index = TARGET.vocab.stoi[TARGET.pad_token])

In [None]:
def train(model, iterator, optimizer, criterion, clip):
    
    model.train()
    
    epoch_loss = 0
    
    for i, batch in enumerate(iterator):
        
        src = batch.src
        trg = batch.trg
        
        optimizer.zero_grad()
        
        output = model(src, trg)
        
        output_dims = output.shape[-1]
        output = output[1:].view(-1, output_dims)
        trg = trg[1:].view(-1)
        
        loss = criterion(output, trg)
        
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
    
    model.eval()
    
    epoch_loss = 0
    
    with torch.no_grad():
    
        for i, batch in enumerate(iterator):

            src = batch.src
            trg = batch.trg

            output = model(src, trg, 0)

            output_dim = output.shape[-1]
            
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            loss = criterion(output, trg)
            
            epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [None]:
epochs = 10
grad_clip = 1

lowest_validation_loss = float('inf')

for epoch in range(epochs):
    
    start_time = time.time()
    
    train_loss = train(model, train_iterator, optimizer, criterion, grad_clip)
    valid_loss = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    
    if valid_loss < lowest_validation_loss:
        lowest_validation_loss = valid_loss
        torch.save(model.state_dict(), 'seq2seq.pt')
    
    print(f'Epoch: {epoch+1:02} | Time: {np.round(end_time-start_time,0)}s')
    print(f'\tTrain Loss: {train_loss:.4f}')
    print(f'\t Val. Loss: {valid_loss:.4f}')

In [None]:
model.load_state_dict(torch.load('seq2seq.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.4f}')

In [None]:
def translate(model, iterator, limit = 4):
    
    model.eval()
    
    epoch_loss = 0
    
    with torch.no_grad():
    
        for i, batch in enumerate(iterator):
            if i < limit :
                
                src = batch.src
                trg = batch.trg

                output = model(src, trg, 0)
                preds = torch.tensor([[torch.argmax(x).item()] for x in output])
                
                print('English Input: ' + str([SOURCE.vocab.itos[x] for x in src][1:-1][::-1]))
                print('Correct German Output: ' + str([TARGET.vocab.itos[x] for x in trg][1:-1]))
                print('Predicted German Output: ' + str([TARGET.vocab.itos[x] for x in preds][1:-1]))
                print('\n')


In [None]:
_, _, eval_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = 1, 
    device = device)

In [None]:

output = translate(model, eval_iterator)