In [23]:
from google.colab import drive
drive.mount("/content/gdrive")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [6]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import spacy
from torch.utils.tensorboard import SummaryWriter  # to print to tensorboard
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator

#from utils import translate_sentence, bleu, save_checkpoint, load_checkpoint
#!python -m spacy download en
#!python -m spacy download de

In [7]:
spacy_ger = spacy.load("de")
spacy_eng = spacy.load("en")

In [8]:
def tokenize_german(text):
    return [tok.text for tok in spacy_ger.tokenizer(text)]

def tokenize_eng(text):
    return [tok.text for tok in spacy_eng.tokenizer(text)]

In [9]:
german = Field(tokenize=tokenize_german, lower=True, init_token="<sos>", eos_token="<eos>")
english = Field(tokenize=tokenize_eng, lower=True, init_token="<sos>", eos_token="<eos>")

In [10]:
train_data, valid_data, test_data = Multi30k.splits(
    exts=(".de", ".en"), fields=(german, english)
)


In [11]:
german.build_vocab(train_data, max_size=10000, min_freq=2)
english.build_vocab(train_data, max_size=10000, min_freq=2)

In [12]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        # with bidirectional=True
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, bidirectional=True)

        #for attention
        self.fc_hidden = nn.Linear(hidden_size * 2, hidden_size)
        self.fc_cell = nn.Linear(hidden_size * 2, hidden_size)
        
        self.dropout = nn.Dropout(p)

    def forward(self, x):
        embedding = self.dropout(self.embedding(x))
        encoder_states, (hidden, cell) = self.rnn(embedding)
        
        #used for attention
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))

        return encoder_states, hidden, cell


In [13]:
class Decoder(nn.Module):
    def __init__(
        self, input_size, embedding_size, hidden_size, output_size, num_layers, p
    ):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(hidden_size * 2 + embedding_size, hidden_size, num_layers)
        
        #Attention
        self.energy = nn.Linear(hidden_size * 3, 1)
        
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(p)
        self.softmax = nn.Softmax(dim=0)
        self.relu = nn.ReLU()

    def forward(self, x, encoder_states, hidden, cell):
        x = x.unsqueeze(0)
        embedding = self.dropout(self.embedding(x))
        sequence_length = encoder_states.shape[0]
        h_reshaped = hidden.repeat(sequence_length, 1, 1)

        energy = self.relu(self.energy(torch.cat((h_reshaped, encoder_states), dim=2)))
        
        attention = self.softmax(energy)
        
        context_vector = torch.einsum("snk,snl->knl", attention, encoder_states)
        rnn_input = torch.cat((context_vector, embedding), dim=2)
        
        outputs, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        predictions = self.fc(outputs).squeeze(0)
        return predictions, hidden, cell




In [14]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, ratio=0.5):
        batch_size = source.shape[1]
        target_len = target.shape[0]
        target_vocab_size = len(english.vocab)

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
        encoder_states, hidden, cell = self.encoder(source)

        # First input will be <SOS> token
        x = target[0]

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, encoder_states, hidden, cell)
            outputs[t] = output
            best_guess = output.argmax(1)
            
            x = target[t] if random.random() < ratio else best_guess

        return outputs


In [15]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
load_model = False
save_model = True

# Training hyperparameters
num_epochs = 100
learning_rate = 3e-4
batch_size = 32

# Model hyperparameters
input_size_encoder = len(german.vocab)
input_size_decoder = len(english.vocab)
output_size = len(english.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 1
enc_dropout = 0.0
dec_dropout = 0.0

In [16]:
writer = SummaryWriter(f"runs/loss_plot")
step = 0

In [17]:
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=batch_size,
    sort_within_batch=True,
    sort_key=lambda x: len(x.src),
    device=device
)

In [18]:
encoder_net = Encoder(
    input_size_encoder, encoder_embedding_size, hidden_size, num_layers, enc_dropout
    ).to(device)

In [19]:
decoder_net = Decoder(
    input_size_decoder,
    decoder_embedding_size,
    hidden_size,
    output_size,
    num_layers,
    dec_dropout,
).to(device)

In [20]:

model = Seq2Seq(encoder_net, decoder_net).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

pad_idx = english.vocab.stoi["<pad>"]
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

In [21]:
def save_checkpoint(state, filename="/content/gdrive/MyDrive/YOLOV4/my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

In [22]:
for epoch in range(num_epochs):
    print(f"[Epoch {epoch} / {num_epochs}]")

    if save_model:
        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
        }
        save_checkpoint(checkpoint)

    model.train()
    for batch_idx, batch in enumerate(train_iterator):
        inp_data = batch.src.to(device)
        target = batch.trg.to(device)
        output = model(inp_data, target)

        # Output is of shape (trg_len, batch_size, output_dim) but Cross Entropy Loss
        # doesn't take input in that form. 
        
        # Let's also remove the start token while we're at it
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)

        optimizer.zero_grad()
        loss = criterion(output, target)

        # Back prop
        loss.backward()

        # Clip to avoid exploding gradient issues, makes sure grads are
        # within a healthy range
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        # Gradient descent step
        optimizer.step()

        # Plot to tensorboard
        writer.add_scalar("Training loss", loss, global_step=step)
        step += 1

[Epoch 0 / 100]
=> Saving checkpoint
[Epoch 0 / 100]
=> Saving checkpoint
[Epoch 1 / 100]
=> Saving checkpoint
[Epoch 1 / 100]
=> Saving checkpoint
[Epoch 2 / 100]
=> Saving checkpoint
[Epoch 2 / 100]
=> Saving checkpoint
[Epoch 3 / 100]
=> Saving checkpoint
[Epoch 3 / 100]
=> Saving checkpoint
[Epoch 4 / 100]
=> Saving checkpoint
[Epoch 4 / 100]
=> Saving checkpoint
[Epoch 5 / 100]
=> Saving checkpoint
[Epoch 5 / 100]
=> Saving checkpoint
[Epoch 6 / 100]
=> Saving checkpoint
[Epoch 6 / 100]
=> Saving checkpoint
[Epoch 7 / 100]
=> Saving checkpoint
[Epoch 7 / 100]
=> Saving checkpoint
[Epoch 8 / 100]
=> Saving checkpoint
[Epoch 8 / 100]
=> Saving checkpoint
[Epoch 9 / 100]
=> Saving checkpoint
[Epoch 9 / 100]
=> Saving checkpoint
[Epoch 10 / 100]
=> Saving checkpoint
[Epoch 10 / 100]
=> Saving checkpoint
[Epoch 11 / 100]
=> Saving checkpoint
[Epoch 11 / 100]
=> Saving checkpoint
[Epoch 12 / 100]
=> Saving checkpoint
[Epoch 12 / 100]
=> Saving checkpoint
[Epoch 13 / 100]
=> Saving check

In [24]:
def translate_sentence(model, sentence, german, english, device, max_length=50):
    spacy_ger = spacy.load("de")
    if type(sentence) == str:
        tokens = [token.text.lower() for token in spacy_ger(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # Add <SOS> and <EOS> in beginning and end respectively
    tokens.insert(0, german.init_token)
    tokens.append(german.eos_token)
    
    text_to_indices = [german.vocab.stoi[token] for token in tokens]
    
    sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)
    with torch.no_grad():
        outputs_encoder, hiddens, cells = model.encoder(sentence_tensor)

    outputs = [english.vocab.stoi["<sos>"]]
    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hiddens, cells = model.decoder(
                previous_word, outputs_encoder, hiddens, cells
            )
            best_guess = output.argmax(1).item()
        outputs.append(best_guess)

        if output.argmax(1).item() == english.vocab.stoi["<eos>"]:
            break
    translated_sentence = [english.vocab.itos[idx] for idx in outputs]
    return translated_sentence[1:]



In [36]:
if load_model:
    load_checkpoint(torch.load("my_checkpoint.pth.tar"), model, optimizer)

sentence = "Mein Name ist Tauseef"
model.eval()
translated_sentence = translate_sentence(
  model, sentence, german, english, device, max_length=50
)
print(f"Translated example sentence: \n {translated_sentence}")


Translated example sentence: 
 ['<unk>', 'is', 'covered', 'in', 'the', '<unk>', 'of', 'human', '<eos>']
