In [85]:
import datetime
import json
import os
import pandas as pd
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm

from itertools import product

from torch import Tensor
from torch import optim
from torch.utils.data import Dataset, DataLoader

from scipy.spatial.distance import cosine

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'

device

'cpu'

In [70]:
# Constants

MAX_LENGTH = 71
SOS_TOKEN = 69
EOS_TOKEN = 70
TEACHER_FORCING_RATIO = 0.5
LEARNING_RATE = 0.01
HIDDEN_SIZE = 256
VOCABULARY_LIMIT = 1000

## Classes

In [71]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size: int, hidden_size: int) -> None:
        super().__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input: torch.Tensor, hidden: torch.Tensor) -> torch.Tensor:
        embedded = self.embedding(input).view(1, 1, -1)
        output, hidden = self.gru(embedded, hidden)
        return output, hidden

    def init_hidden(self) -> torch.Tensor:
        return torch.zeros(1, 1, self.hidden_size, device=device)

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH) -> None:
        super().__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)
    
    def forward(self, input_tensor, hidden, encoder_outputs):
        embedded = self.embedding(input_tensor).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [72]:
def sentence_to_tensor(content, target_size = MAX_LENGTH) -> torch.Tensor:
    # Add padding to the end of the sentence, so that the length is equal to target_size
    content.append(EOS_TOKEN)
    tensor = torch.tensor(content, dtype=torch.long, device=device).view(-1, 1)

    # if tensor.size()[0] < target_size:
    #     padding = torch.zeros(target_size - tensor.size()[0], 1, dtype=torch.int32, device=device)
    #     tensor = torch.cat((tensor, padding), dim=0)
        
    return tensor

In [73]:
class TranscriptionDataset(Dataset):
    def __init__(self, path_to_words: str = 'data/word-based/words.csv'):
        self.__words = pd.read_csv(path_to_words).sample(frac=1, random_state=42).reset_index(drop=True)

    def __len__(self):
        return self.__words.shape[0]

    def __getitem__(self, idx):
        if idx >= self.__len__():
            raise IndexError
        
        row = self.__words.iloc[idx]

        input_tensor = sentence_to_tensor(json.loads(row[2]))
        target_tensor = sentence_to_tensor(json.loads(row[3]))
        return input_tensor, target_tensor


In [74]:
class Trainer:
    def __init__(self, encoder: EncoderRNN, decoder: DecoderRNN,
                 encoder_optimizer: optim.Optimizer, decoder_optimizer: optim.Optimizer,
                 max_length: int = MAX_LENGTH):
        self.__encoder = encoder.to(device)
        self.__decoder = decoder.to(device)
        self.__encoder_optimizer = encoder_optimizer
        self.__decoder_optimizer = decoder_optimizer
        self.__max_length = max_length
        self.__loss = 0

    def __init_train(self):
        encoder_hidden = self.__encoder.init_hidden()

        self.__encoder_optimizer.zero_grad()
        self.__decoder_optimizer.zero_grad()

        encoder_outputs = torch.zeros(self.__max_length, self.__encoder.hidden_size, device=device)

        self.__loss = 0

        return encoder_hidden, encoder_outputs
    
    def __encoder_train(self, encoder_outputs, input_tensor, encoder_hidden):
        input_length = input_tensor.size(0)

        for ei in range(input_length):
            encoder_output, encoder_hidden = self.__encoder(input_tensor[ei], encoder_hidden)
            encoder_outputs[ei] = encoder_output[0, 0]

        return encoder_outputs

    def __optimizers_step(self):
        self.__encoder_optimizer.step()
        self.__decoder_optimizer.step()

    def __decoder_train(self, decoder_input, decoder_hidden, encoder_outputs, target_tensor, criterion):
        target_length = target_tensor.size(0)
        use_teacher_forcing = True if random.random() < TEACHER_FORCING_RATIO else False

        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = self.__decoder(
                    decoder_input, decoder_hidden, encoder_outputs)

            if use_teacher_forcing:
                decoder_input = target_tensor[di]
            else:
                topv, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze().detach()
            
            self.__loss += criterion(decoder_output, target_tensor[di])

            if use_teacher_forcing and decoder_input.item() == EOS_TOKEN:
                break

    def train(self, input_tensor: Tensor, target_tensor: Tensor, 
              criterion: nn.Module, max_length: int = MAX_LENGTH) -> tuple[float, float]:        

        # Encoder training
        encoder_hidden, encoder_outputs = self.__init_train()
        encoder_outputs = self.__encoder_train(encoder_outputs, input_tensor, encoder_hidden)

        # Decoder training
        decoder_input = torch.tensor([[SOS_TOKEN]], device=device)
        decoder_hidden = encoder_hidden

        self.__decoder_train(decoder_input, decoder_hidden, encoder_outputs, target_tensor, criterion)
        
        # Optimizers step
        self.__loss.backward()
        self.__optimizers_step()

        return self.__loss.item() / target_tensor.size(0)

## Training

In [75]:
def train_loop(encoder: EncoderRNN, decoder: DecoderRNN, dataset: TranscriptionDataset,
               epochs: int, print_every: int = 100):
    print(f'Training with {epochs=}, {TEACHER_FORCING_RATIO=}, {LEARNING_RATE=}, {HIDDEN_SIZE=}, {VOCABULARY_LIMIT=}')
    encoder_optimizer = optim.SGD(encoder.parameters(), lr=LEARNING_RATE)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=LEARNING_RATE)
    criterion = nn.NLLLoss()


    trainer = Trainer(encoder, decoder, encoder_optimizer, decoder_optimizer)
    
    for epoch in range(epochs):
        total_loss = 0

        # for iteration, (input_tensor, target_tensor) in tqdm.tqdm(enumerate(dataset)):
        for iteration in range(VOCABULARY_LIMIT):
        # for iteration in tqdm.tqdm(range(VOCABULARY_LIMIT)):
            input_tensor, target_tensor = dataset[iteration]
        # for iteration, (input_tensor, target_tensor) in enumerate(dataset):
            loss = trainer.train(input_tensor, target_tensor, criterion)
            total_loss += loss

            # if iteration % print_every == 0:
            #     print(f'Epoch: {epoch} Iteration: {iteration + 1} loss: {loss}')
            #     print(f'Average loss: {total_loss / (iteration + 1)}')
        
        # print(f'Epoch {epoch} average loss: {total_loss / (iteration + 1)}')

In [76]:
encoder = EncoderRNN(MAX_LENGTH, HIDDEN_SIZE)
decoder = DecoderRNN(HIDDEN_SIZE, MAX_LENGTH)

In [77]:
dataset = TranscriptionDataset()

In [78]:
train_loop(encoder, decoder, dataset, epochs=1000, print_every=10000)

Training with epochs=1000, TEACHER_FORCING_RATIO=0.5, LEARNING_RATE=0.01, HIDDEN_SIZE=256, VOCABULARY_LIMIT=1000


In [80]:
torch.save(encoder.state_dict(), f'models/encoder{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}-{VOCABULARY_LIMIT}.pt')
torch.save(decoder.state_dict(), f'models/decoder{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}-{VOCABULARY_LIMIT}.pt')

# Evaluating

In [12]:
encoder.load_state_dict(torch.load('models/encoder20230510-205652-10000.pt'))
decoder.load_state_dict(torch.load('models/decoder20230510-205652-10000.pt'))

<All keys matched successfully>

In [11]:
word_to_id = pd.read_csv('data/word-based/word_to_id.csv')
tensor_to_word_mapping = {item[1]: item[0] for item in word_to_id.iloc}
tensor_to_word_mapping[EOS_TOKEN] = ''

transcription_to_id = pd.read_csv('data/word-based/transcription_to_id.csv')
transcription_to_id_mapping = {item[1]: item[0] for item in transcription_to_id.iloc}
transcription_to_id_mapping[EOS_TOKEN] = ''

def tensor_to_word(tensor: Tensor) -> str:
    return ''.join(tensor_to_word_mapping[i.item()] for i in tensor)

def tensor_to_transcription(tensor: Tensor) -> str:
    return ''.join(transcription_to_id_mapping[i.item()] for i in tensor)

In [81]:
def evaluate(encoder, decoder, input_tensor, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.init_hidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_TOKEN]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_TOKEN:
                decoded_words.append(EOS_TOKEN)
                break
            else:
                decoded_words.append(topi.item())

            decoder_input = topi.squeeze().detach()

        return decoded_words

In [13]:
def accuracy(target: Tensor, result: Tensor) -> float:
    hit = 0
    for i in range(min(len(target), len(result))):
        if target[i] == result[i]:
            hit += 1

    return hit / max(len(target), len(result))

In [14]:
def edit_distance(word1, word2):
    m = len(word1)
    n = len(word2)

    # Create a matrix to store the edit distances
    dp = [[0] * (n + 1) for _ in range(m + 1)]

    # Fill in the first row and column
    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j

    # Calculate the edit distance
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if word1[i - 1] == word2[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = 1 + min(dp[i - 1][j],      # Deletion
                                   dp[i][j - 1],      # Insertion
                                   dp[i - 1][j - 1])  # Substitution

    # Return the edit distance
    return dp[m][n]


In [33]:

def calculate_accuracy(encoder, decoder, dataset):
    total_accuracy = 0
    amount_of_words = int(2 * VOCABULARY_LIMIT / 8)

    for i in range(VOCABULARY_LIMIT, VOCABULARY_LIMIT + amount_of_words):
    # for i in range(0, 1):
        input_tensor = dataset[i][0]
        target_tensor = dataset[i][1]
        result_tensor = evaluate(encoder, decoder, input_tensor)

        input_word = tensor_to_word(input_tensor.view(-1))
        target_word = tensor_to_transcription(target_tensor.view(-1))
        result_word = tensor_to_transcription(torch.tensor(result_tensor))

        pair_accuracy = accuracy(target_tensor, result_tensor)
        total_accuracy += pair_accuracy
        
        # print(f'Input tensor: {input_tensor}, Target tensor: {target_tensor}, Result tensor: {result_tensor}')
        # print(f'Input: {input_word}, Target: {target_word}, Result: {result_word}, Accuracy: {pair_accuracy}')


    return total_accuracy / amount_of_words

In [82]:

def calculate_model_performance(encoder, decoder, dataset, criterion, is_showing_output=False, runs=1):
    total_criterion = 0
    amount_of_words = int(2 * VOCABULARY_LIMIT / 8)
    for run in range(runs):

        for i in range(VOCABULARY_LIMIT, VOCABULARY_LIMIT + amount_of_words):
        # for i in range(0, 1):
            input_tensor = dataset[i][0]
            target_tensor = dataset[i][1]
            result_tensor = evaluate(encoder, decoder, input_tensor)

            input_word = tensor_to_word(input_tensor.view(-1))
            target_word = tensor_to_transcription(target_tensor.view(-1))
            result_word = tensor_to_transcription(torch.tensor(result_tensor))

            pair_criterion = criterion(target_tensor, result_tensor)
            total_criterion += pair_criterion
            
            if is_showing_output:
                # print(f'Input tensor: {input_tensor}, Target tensor: {target_tensor}, Result tensor: {result_tensor}')
                print(f'Input: {input_word}, Target: {target_word}, Result: {result_word}, Criterion: {pair_criterion}')


    return total_criterion / amount_of_words / runs

In [None]:
calculate_model_performance(encoder, decoder, dataset, edit_distance, runs=1, is_showing_output=True)

In [None]:
epochs = [100, 1000]

for epoch in epochs:
    print(f'Running for {epoch} epochs')
    encoder = EncoderRNN(MAX_LENGTH, HIDDEN_SIZE, device=device)
    decoder = DecoderRNN(HIDDEN_SIZE, MAX_LENGTH, device=device)

    train_loop(encoder, decoder, dataset, epochs=epoch, print_every=10000)

    current_accuracy = calculate_accuracy(encoder, decoder, dataset)

    print(f'Epochs: {epoch}, Accuracy: {current_accuracy}')

In [105]:
def run_encoder_only(input_tensor):
    with torch.no_grad():
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.init_hidden()

        encoder_outputs = torch.zeros(MAX_LENGTH, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]
            
        return encoder_hidden

def calculate_cosine_between_two_vectors(input1, input2):
    output1 = run_encoder_only(input1).reshape(HIDDEN_SIZE).cpu().numpy()
    output2 = run_encoder_only(input2).reshape(HIDDEN_SIZE).cpu().numpy()

    return cosine(output1, output2)

def cartesian_product_with_function_as_dataframe(list1, list2, f):
    result = [(tensor_to_word(x), tensor_to_word(y), f(x, y)) for x, y in product(list1, list2)]
    return pd.DataFrame(result, columns=['Word 1', 'Word 2', 'Result'])

In [112]:
# words = [dataset[i][0] for i in range(VOCABULARY_LIMIT, VOCABULARY_LIMIT + 10)]
words = [dataset[i][0] for i in range(26691, 26694)]

pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 1000)

cartesian_product_with_function_as_dataframe(words, words, calculate_cosine_between_two_vectors)

Unnamed: 0,Word 1,Word 2,Result
0,конфликтност,конфликтност,0.0
1,конфликтност,Кремена,1.012918
2,конфликтност,оперно-театрален,0.882621
3,Кремена,конфликтност,1.012918
4,Кремена,Кремена,0.0
5,Кремена,оперно-театрален,0.265607
6,оперно-театрален,конфликтност,0.882621
7,оперно-театрален,Кремена,0.265607
8,оперно-театрален,оперно-театрален,0.0
