In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

import re

import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
criterion = nn.CrossEntropyLoss()

In [3]:
source = torch.tensor([[1, 2, 3, 1], [1, 1, 2, 3]], dtype=torch.float32)
target = torch.tensor([1, 1], dtype=torch.long)
criterion(source, target)

tensor(1.9938)

In [4]:
source.shape, target.shape

(torch.Size([2, 4]), torch.Size([2]))

In [5]:
class Dictionary():
    def __init__(self, name):
        self.name = name
        self.word2idx = {'<sos>': 0, '<eos>': 1, '<unk>': 2}
        self.word2count = {'<sos>': 1, '<eos>': 1, '<unk>': 1}
        self.idx2word = {0: '<sos>', 1: '<eos>', 2: '<unk>'}
        self.num_words = 3
        
    def addSentence(self, sentence):
        for word in sentence.split():
            self.addWord(word)
        
    def addWord(self, word):
        if not word in self.word2idx:
            self.word2idx[word] = self.num_words
            self.word2count[word] = 1
            self.idx2word[self.num_words] = word
            self.num_words += 1
        else:
            self.word2count[word] += 1

    def __len__(self):
        
        return self.num_words

In [6]:
class Corpus():
    def __init__(self, source_name, target_name):
        self.source_lang = Dictionary(source_name)
        self.target_lang = Dictionary(target_name)
    
    def get_data(self, path):
        pairs = []

        with open(path, 'r') as lines:
            for line in lines:
                pair = line.replace('\n', '').split(',')
                pairs.append(pair)
                self.source_lang.addSentence(pair[0])
                self.target_lang.addSentence(pair[1])

        for num, pair in enumerate(pairs):
            for word in pair[0].split():
                if self.source_lang.word2count[word] <= 1:
                    self.source_lang.num_words -= 1
                    pairs[num][0] = re.sub(fr"\b{word}\b", "<unk>", pairs[num][0])
                
            for word in pair[1].split():
                if self.target_lang.word2count[word] <= 1:
                    self.target_lang.num_words -= 1
                    pairs[num][1] = re.sub(fr"\b{word}\b", "<unk>", pairs[num][1])

        return pairs, self.source_lang, self.target_lang
    
corpus = Corpus('en', 'de')
pairs, source_lang, target_lang = corpus.get_data('/content/text (1).txt')
random.choice(pairs)

['<sos> two young kids are playing some kind of a game',
 '<sos> zwei kleine kinder spielen irgendein spiel <eos>']

In [7]:
LEN_SOURCE_VOCAB = source_lang.num_words
LEN_TARGET_VOCAB = target_lang.num_words
EMBEDDING_SIZE = 10
HIDDEN_SIZE = 10
N_LAYERS = 1
DROPOUT = 0.5
LEN_SOURCE_VOCAB, LEN_TARGET_VOCAB

(5866, 7767)

In [8]:
def pair2tensor(pair):
    source = torch.tensor([source_lang.word2idx[word] for word in pair[0].split()], dtype=torch.long)
    target = torch.tensor([target_lang.word2idx[word] for word in pair[1].split()], dtype=torch.long)

    return source, target

In [9]:
pairs[32]

['<sos> a old man having a beer alone <eos>',
 '<sos> ein alter mann der allein ein bier trinkt <eos>']

In [10]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, n_layers, dropout):
        super().__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size,
                                      embedding_size)
        
        self.rnn = nn.LSTM(embedding_size,
                           hidden_size,
                           n_layers,
                           dropout=dropout)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, source):
        embed = self.dropout(self.embedding(source))

        output, (hidden, cell) = self.rnn(embed)

        return hidden, cell

encoder = Encoder(LEN_SOURCE_VOCAB,
                  EMBEDDING_SIZE,
                  HIDDEN_SIZE,
                  N_LAYERS,
                  DROPOUT).to(device)



In [11]:
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, n_layers, dropout):
        super().__init__()

        self.output_size = output_size
        self.embedding_size = embedding_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_size,
                                      embedding_size)
        
        self.rnn = nn.LSTM(embedding_size,
                           hidden_size,
                           n_layers,
                           dropout=dropout)
        
        self.fc1 = nn.Linear(hidden_size,
                             output_size)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        embed = self.dropout(self.embedding(input))

        output, (hidden, cell) = self.rnn(embed, (hidden, cell))

        prediction = self.fc1(output)

        return prediction, hidden, cell

decoder = Decoder(LEN_TARGET_VOCAB,
                  EMBEDDING_SIZE,
                  HIDDEN_SIZE,
                  N_LAYERS,
                  DROPOUT).to(device)

In [12]:
pair = ['<sos> a bride and groom make small talk with a guest as the groom shakes the guest s hand <eos>',
 '<sos> braut und brautigam beim <unk> mit einem gast und der brautigam schuttelt dem gast die hand <eos>']

In [13]:
class Seq2seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        
        hidden, cell = self.encoder(source)
        
        word = target[0].view(-1)

        outputs = torch.zeros(len(target) - 1, decoder.output_size).to(device)

        for idx in range(0, len(target) - 1):
            output, hidden, cell = decoder.forward(word.view(-1), hidden, cell)

            outputs[idx] = output

            teacher_force = random.random() < teacher_forcing_ratio

            prediction = output.argmax(1)

            word = target[idx + 1] if teacher_forcing_ratio else prediction

        return outputs

seq2seq = Seq2seq(encoder, decoder).to(device)

In [14]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)
        
seq2seq.apply(init_weights)

Seq2seq(
  (encoder): Encoder(
    (embedding): Embedding(5866, 10)
    (rnn): LSTM(10, 10, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(7767, 10)
    (rnn): LSTM(10, 10, dropout=0.5)
    (fc1): Linear(in_features=10, out_features=7767, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [15]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(seq2seq):,} trainable parameters')

The model has 223,527 trainable parameters


In [16]:
optimizer_enc = optim.Adam(encoder.parameters(), lr=0.01)
optimizer_dec = optim.Adam(decoder.parameters(), lr=0.01)
teacher_forcing_ratio = 0.5

In [19]:
max_epoch = 10

for epoch in range(max_epoch):
    encoder.train()
    decoder.train()

    epoch_loss = 0

    for pair in pairs:
        if (len(pair[0].split()) and len(pair[1].split()) <= 17):
            source, target = pair2tensor(pair)
            source, target = source.to(device), target.to(device)

            trg = target

            optimizer_enc.zero_grad()
            optimizer_dec.zero_grad()

            hidden, cell = encoder(source)
            
            word = trg[0].view(-1)

            outputs = torch.zeros(len(trg) - 1, decoder.output_size).to(device)

            for idx in range(0, len(trg) - 1):
                output, hidden, cell = decoder.forward(word.view(-1), hidden, cell)

                outputs[idx] = output

                teacher_force = random.random() < teacher_forcing_ratio

                prediction = output.argmax(1)

                word = trg[idx + 1] if teacher_forcing_ratio else prediction

            output = outputs

            loss = criterion(output, target[1:])    

            loss.backward()

            torch.nn.utils.clip_grad_norm_(encoder.parameters(), 1)
            torch.nn.utils.clip_grad_norm_(decoder.parameters(), 1)
            
            optimizer_enc.step()
            optimizer_dec.step()
            
            epoch_loss += loss.item()   

RuntimeError: ignored

In [None]:
max_epoch = 10

for epoch in range(max_epoch):
    seq2seq.train()

    epoch_loss = 0
    
    for pair in pairs:
        if (len(pair[0].split()) and len(pair[1].split()) <= 17):
            source, target = pair2tensor(pair)
            source, target = source.to(device), target.to(device)

            optimizer.zero_grad()

            output = seq2seq.forward(source, target).to(device)


            loss = criterion(output, target[1:])

            loss.backward()

            torch.nn.utils.clip_grad_norm_(seq2seq.parameters(), 1)
            
            optimizer.step()
            
            epoch_loss += loss.item()

    print(epoch_loss)

In [None]:
pair

In [None]:
seq2seq.eval()

In [None]:
pos = 20
source, target = pair2tensor(pairs[pos])
print(pairs[pos][0])
print(pairs[pos][1])
source, target

In [None]:
outputs = seq2seq.forward(source.to(device), target[:-1].to(device))

In [None]:
outputs.argmax(1)