In [2]:
import os
import pandas as pd
import re
import random as rd
import numpy as np

current_dir = os.getcwd()

#path_dataset = "/Users/tommasoancilli/Downloads/ita-eng/ita.txt"

def Text_creation (path_dataset, epsilon, max_length = 10):

   data = pd.read_csv(path_dataset, header=None, delimiter = "\t")

   EPSILON = epsilon # decide the fraction of senteces to be included in the dataset
   MAX_LENGTH = max_length  #max length of sentences allowed

   trans_file_eng = open("eng.txt", "w")
   trans_file_ita = open("ita.txt", "w")

   for item in range(data.shape[0]):
      text_it = data[1][item]
      text_eng =  data[0][item]

      text_it_split = text_it.split()
      text_eng_split = text_eng.split()

      if len(text_it_split) <= MAX_LENGTH and len(text_eng_split) <= MAX_LENGTH:
         if rd.random() < EPSILON:
            trans_file_eng.write(text_eng + "\n")
            trans_file_ita.write(text_it + "\n")

   trans_file_eng.close()
   trans_file_ita.close()

class Language ():

   def __init__(self, path_name) -> None:

      self.path_name = path_name

      self.word2index = {"EOS":0, "SOS":1}
      self.index2word = {0:"EOS", 1:"SOS"}
      self.n_words = 2
      self.n_sentences = 0
      self.n_max_length = 0

   def normalize_string (self, s) -> str:
      s = re.sub(r"([.!? \, \' \" \% \-])", r" ", s) #remove puntuation
      s = s.lower() #convert to lower
      s = s.strip() # remove spaces from the beginning / end
      #s = "SOS" + " " + s + " "+ "EOS"
      s = s + " " "EOS"
      return s

   def add_string(self, s):
      partial_length = 0
      for word in s.split(" "):
         self.add_word(word)
         partial_length = partial_length + 1

      self.n_sentences = self.n_sentences + 1

      if self.n_max_length < partial_length:
         self.n_max_length = partial_length

   def add_word(self, word):

      if word not in self.word2index:
         self.word2index[word] = self.n_words
         self.index2word[self.n_words] = word
         self.n_words = self.n_words + 1

   def string_processing(self):
      processed_file = open("processed-"+self.path_name, "w")

      with open(str(self.path_name)) as file:
         for s in file:
            s = self.normalize_string(s)
            processed_file.write(s +"\n")
            self.add_string(s)

      processed_file.close()

   def string_translation(self, input_string):

      input_string = self.normalize_string(input_string)
      if len(input_string.split()) > self.n_max_length:
         raise KeyError("LENGTH OF THE INPUT SENTENCE IS TOO LONG!!")

      matrix  = np.zeros( (self.n_max_length) )


      for idx, word in enumerate(input_string.split(), start = 0):
            if word in self.word2index:
               matrix[idx] = self.word2index[word]
            else:
               raise KeyError("WORD NOT FOUND, IMPOSSIBLE TO TRANSLATE")

      np.save(f'matrix-processed.npy', matrix)

      return matrix


   #TODO #2:


def Dataset_creation(lang_input, lang_output, training_test_ratio:tuple = (0.85,0.15)):

   MAX_LENGTH = max(lang_input.n_max_length, lang_output.n_max_length) + 1 # I've -> I ve so I have two words now

   if lang_input.n_sentences != lang_output.n_sentences:
      raise KeyError ("Error, the number of examples does not match")

   #TODO #1:
   matrix_input = Matrix_creation(lang_input, MAX_LENGTH)
   matrix_output = Matrix_creation(lang_output, MAX_LENGTH)

   split_train_test(matrix_input, matrix_output, training_test_ratio)


def Matrix_creation(lang, MAX_LENGTH):
   matrix  = np.zeros( (lang.n_sentences, MAX_LENGTH) )
   path_input = "processed-"+lang.path_name

   try:

      with open(str(path_input), 'r') as file:
         for idx, line in enumerate(file, start = 0):
            words = line.split()
            for id, word in enumerate(words, start = 0 ):
               matrix[idx][id] = lang.word2index[word]

   except Exception as e: #TODO #3
      print(f"{e},{words},{idx},{id}")

   np.save(f'matrix-{re.sub(".txt", "", lang.path_name)}.npy', matrix)
   return matrix


def main ():
   path_dataset = "/Users/tommasoancilli/Downloads/ita-eng/ita.txt"
   max_length = 10

   Text_creation (path_dataset, epsilon, max_length = max_length)

   input_lang = Language(path_name = "eng.txt")
   output_lang = Language(path_name = "ita.txt")

   input_lang.string_processing()
   output_lang.string_processing()

   Dataset_creation(input_lang, output_lang,training_test_ratio= (0.85,0.15))


def split_train_test(matrix_input, matrix_target, training_test_ratio:tuple = (0.85,0.15)):

   dim = matrix_input.shape
   n_rows = dim[0]

   input_train = []
   output_train = []

   input_test = []
   output_test = []

   shuffled_list = rd.sample(range(n_rows), n_rows)  # The range of integers from 0 to n-1

   n_example_training = int(n_rows * training_test_ratio[0])
   n_example_test = n_rows - n_example_training

   for i in range(n_example_training):
      item = shuffled_list[i]
      input_train.append( matrix_input[item] )
      output_train.append( matrix_target[item] )

   for i in range(n_example_training,n_rows):
      item = shuffled_list[i]
      input_test.append( matrix_input[item] )
      output_test.append( matrix_target[item] )

   np.save('input_train.npy', input_train)
   np.save('input_test.npy', input_test)

   np.save('output_train.npy', output_train)
   np.save('output_test.npy', output_test)

   return 0

"""
if __name__ == "__main__":
   main()
"""

'\nif __name__ == "__main__":\n   main()\n'

In [3]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import numpy as np
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
import matplotlib.pyplot as plt


SOS = 1
EOS = 0
class EncoderRNN(nn.Module):

    def __init__(self, input_size, device, hidden_size, num_layers, dropout_p=0.1, emb = True):

        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.device = device

        self.embedding = nn.Embedding(input_size, hidden_size).to(self.device)
        self.gru = nn.GRU(hidden_size, hidden_size, num_layers=num_layers,  batch_first=True).to(self.device)
        self.dropout = nn.Dropout(dropout_p).to(self.device)



    def forward(self, input):
        embedded = self.dropout(self.embedding(input))
        output, hidden = self.gru(embedded)
        return output, hidden


class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, num_layers, device):
        super(DecoderRNN, self).__init__()
        self.device = device

        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)


    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        # analizza ogni riga della matrice, per tutta la sua lunghezza
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=self.device).fill_(SOS)
        decoder_hidden = encoder_hidden
        decoder_outputs = []

        if target_tensor is not None:
          dim = target_tensor.shape[1]
        else:
          dim = max_length + 1

        for i in range(dim):
            decoder_output, decoder_hidden  = self.forward_step(decoder_input, decoder_hidden)
            decoder_outputs.append(decoder_output)

            if target_tensor is not None:
                # Teacher forcing: Feed the target as the next input
                decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing
            else:
                # Without teacher forcing: use its own predictions as the next input
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()  # detach from history as input

        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        return decoder_outputs, decoder_hidden, None # We return `None` for consistency in the training loop

    def forward_step(self, input, hidden):
        output = self.embedding(input)
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.out(output)
        return output, hidden

def numpy2torch(numpy_path):
    """
    It converts the numpy matrix into a torch tensor to process the data
    """
    matrix = np.load(numpy_path)

    return matrix

def torch_format(batch_size, input_train, output_train, device):

  train_data = TensorDataset(torch.LongTensor(input_train).to(device), torch.LongTensor(output_train).to(device))
  #train_sampler = RandomSampler(train_data)
  #train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
  train_dataloader = DataLoader(train_data, batch_size=batch_size)

  return train_dataloader


def train_epoch(encoder, decoder, n_elem_batch, learning_rate, train_dataloader, loss_function, device):
    """
    training funtion on a single epoch of the input matrix dataset
    """
    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)

    total_loss_batch = 0

    for data in train_dataloader:

      encoder_optimizer.zero_grad()
      decoder_optimizer.zero_grad()

      input_tensor, output_tensor = data

      encoder_outputs, encoder_hidden = encoder(input_tensor)
      decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, output_tensor)

      loss = loss_function(
            decoder_outputs.view(-1, decoder_outputs.size(-1)).to(device),
            output_tensor.view(-1).to(device) )

      loss.backward()
      total_loss_batch += loss.item()

      encoder_optimizer.step()
      decoder_optimizer.step()
      #print(input_tensor.shape)
      #print(f"total loss{total_loss_batch}, dimension{input_tensor.shape}") #-> da sostituire con dimensioni
    return total_loss_batch/input_tensor.shape[0]


def train(encoder, decoder, n_elem_batch, learning_rate, train_dataloader, n_epochs, device):

    loss_function = nn.NLLLoss()
    total_train_loss_plot = []

    # Da aggiungere i plot se ci interessano =)

    for steps in tqdm( list (range(n_epochs)), desc="number of epochs"):

        error = train_epoch(encoder, decoder, n_elem_batch, learning_rate, train_dataloader, loss_function, device)
        total_train_loss_plot.append(error)

    plt.plot(total_train_loss_plot)

def test(encoder, decoder, test_dataloader, device):
    encoder.eval()
    decoder.eval()

    loss_function = nn.NLLLoss()
    test_loss = 0

    with torch.no_grad():
      for data in test_dataloader:
          input_tensor, output_tensor = data
          encoder_outputs, encoder_hidden = encoder(input_tensor)
          decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, output_tensor)

          test_loss += loss_function(
          decoder_outputs.view(-1, decoder_outputs.size(-1)).to(device),
          output_tensor.view(-1).to(device))

          #print(input_tensor[0],output_tensor[0])
      #print(test_loss)
      return test_loss / decoder_outputs.shape[0]

def translation(encoder, decoder, input_lang, output_lang,device):

    input_sentence = input("Type the sentence you want to translate:")
    vector = input_lang.string_translation(input_sentence)
    input_tensor = torch.tensor(vector,dtype=torch.long).to(device).unsqueeze(1)

    with torch.no_grad():
        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden)

        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()


        #print(decoded_ids)
        #print(decoded_ids.shape)
        for i in range(3):
          decoded_words = []
          print(f"{i} solution proposed out of 3")
          for idx in decoded_ids[i]:
            if idx.item() == EOS:
                decoded_words.append('<EOS>')
                break
            decoded_words.append(output_lang.index2word[idx.item()])

          print(f"Original sentence = {input_sentence} --> Translated sentence = {decoded_words}")


def evaluation(encoder, decoder, n_elem_batch, learning_rate, n_epochs, train_dataloader, device, test_dataloader):

    train(encoder, decoder, n_elem_batch, learning_rate, train_dataloader, n_epochs,device)
    test(encoder, decoder, test_dataloader, device)



In [4]:
#from text_process import *
#from RNN import *

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

path_dataset = "/content/ita2eng.txt"
max_length = 10
epsilon = 0.3

Text_creation(path_dataset, epsilon = epsilon, max_length = max_length)
input_lang = Language(path_name = "eng.txt")
output_lang = Language(path_name = "ita.txt")

input_lang.string_processing()
output_lang.string_processing()

Dataset_creation(input_lang, output_lang,training_test_ratio= (0.8,0.2))



cpu


In [None]:
hidden_size = 512
batch_size = 64
number_of_epochs = 128
number_layers = 4

input_train = numpy2torch("input_train.npy")
output_train = numpy2torch("output_train.npy")

input_test = numpy2torch("input_test.npy")
output_test = numpy2torch("output_test.npy")

Encoder = EncoderRNN(input_size=input_lang.n_words, device=device, hidden_size=hidden_size, num_layers=number_layers).to(device)
Decoder = DecoderRNN(hidden_size=hidden_size, device = device, output_size=output_lang.n_words, num_layers=number_layers).to(device)

train_dataloader = torch_format(batch_size, input_train, output_train, device)
test_dataloader = torch_format(batch_size, input_test, output_test, device)

evaluation(encoder=Encoder, decoder=Decoder, n_elem_batch=batch_size, learning_rate=0.001, n_epochs=number_of_epochs,
           train_dataloader = train_dataloader, test_dataloader = test_dataloader, device= device)


number of epochs:  12%|█▎        | 16/128 [56:44<6:28:03, 207.89s/it]

In [None]:
while True:
  translation(encoder=Encoder, decoder=Decoder, input_lang=input_lang, output_lang=output_lang, device=device)
  response = input("do you want to translate other sentences? [Y/N]: ")
  if response == "N":
    break