# Mecanismos de Atención

In [1]:
import unicodedata
import re

def unicodeToAscii(s): #transforma letras especiales en normales
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

def normalizeString(s):
    s = unicodeToAscii(s.lower().strip()) # elimina espacio en blanco y lo vuelve minuscula
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s) #transforma un caracter que no es ingles a un espacio en blanco
    return s

def read_file(file, reverse=False):
    # Leer el archivo y dividirlo en líneas
    lines = open(file, encoding='utf-8').read().strip().split('\n')

    # Dividir cada línea en pares y normalizar
    pairs = [[normalizeString(s) for s in l.split('\t')[:2]] for l in lines]

    return pairs

In [3]:

pairs = read_file('/content/drive/MyDrive/SIS421/SegundoParcial/Datasets/por.txt')

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


Downloading builder script:   0%|          | 0.00/4.41k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/8.93k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/2.34M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/67782 [00:00<?, ? examples/s]

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


In [4]:
import random

random.choice(pairs)

KeyError: "Invalid key: 0. Please first select a split. For example: `my_dataset_dictionary['train'][0]`. Available splits: ['train']"

In [None]:
SOS_token = 0
EOS_token = 1
PAD_token = 2

class Lang:
    def __init__(self, name):
        self.name = name #el nombre del lenguaje
        self.word2index = {"SOS": 0, "EOS": 1, "PAD": 2} #mapear palabras a indices
        self.word2count = {} #recuento de todas las palabras del corpus
        self.index2word = {0: "SOS", 1: "EOS", 2: "PAD"} # mapear los indices a palabras
        self.n_words = 3  #palabras especiales para el nlp

    def addSentence(self, sentence):
        for word in sentence.split(' '): #Agrega la sentencia y la separa por ''
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index: #verifica si la palabra no existe en el vocabulario
            self.word2index[word] = self.n_words  #la agrega
            self.word2count[word] = 1 #inicializa la palabra en 1 ya que es la primera vez que se agrega al vocabulario
            self.index2word[self.n_words] = word #actualiza el vocabulario
            self.n_words += 1 #se aumenta en 1 la cantidad todal del vocabulario
        else:
            self.word2count[word] += 1 #se actualiza en 1 si ya existe la palabra

    def indexesFromSentence(self, sentence):
        return [self.word2index[word] for word in sentence.split(' ')] #convierte las sentencias a indices

    def sentenceFromIndex(self, index):
        return [self.index2word[ix] for ix in index] #convierte los indices a sentencias

Para poder aplicar la capa de `attention` necesitamos que nuestras frases tengan una longitud máxima definida.

In [None]:
MAX_LENGTH = 10

port_prefixes = (
    # Presente
    "eu sou ", "sou ",
    "eu estou ", "estou ",
    "ele é ", "é ",
    "ele está ", "está ",
    "ela é ", "ela está ",
    "ela está ", "ela está ",
    "você é ", "você está ",
    "vocês são ", "vocês estão ",
    "nós somos ", "somos ",
    "nós estamos ", "estamos ",
    "eles são ", "eles estão ",
    "elas são ", "elas estão ",
    "eu fui ", "fui ",
    "eu estive ", "estive ",
    "ele foi ", "foi ",
    "ele esteve ", "esteve ",
    "ela foi ", "ela esteve ",
    "ela esteve ", "ela esteve ",
    "você foi ", "você esteve ",
    "vocês foram ", "vocês estiveram ",
    "nós fomos ", "fomos ",
    "nós estivemos ", "estivemos ",
    "eles foram ", "eles estiveram ",
    "elas foram ", "elas estiveram ",
    "eu serei ", "serei ",
    "eu estarei ", "estarei ",
    "ele será ", "será ",
    "ele estará ", "estará ",
    "ela será ", "ela estará ",
    "ela estará ", "ela estará ",
    "você será ", "você estará ",
    "vocês serão ", "vocês estarão ",
    "nós seremos ", "seremos ",
    "nós estaremos ", "estaremos ",
    "eles serão ", "eles estarão ",
    "elas serão ", "elas estarão "
)


def filterPairs(pairs, filters, lang=0):
    return [p for p in pairs if p[lang].startswith(filters)] #Itera sobre cada par del vocab,
                                                             #y verifica si la palabra empieza
                                                             #por ese filtro y se añade a la lista

def trimPairs(pairs):
    return [p for p in pairs if len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH] #devuelve pares de palabras menores
                                                                                                         #al tamaño maximo

In [None]:
def prepareData(file, filters=None, reverse=False):

    pairs = read_file(file, reverse) #Leemos el dataset
    print(f"Tenemos {len(pairs)} pares de frases")

    pairs = trimPairs(pairs) # Devolvemos los pares con longitud menor a maxlenght
    print(f"Tenemos {len(pairs)} pares de frases con longitud menor de {MAX_LENGTH}")

    # Reverse pairs, make Lang instances
    if reverse: #Invierte los pares en caso de que el vocab este en un formato especial
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang('eng')
        output_lang = Lang('port')
    else: #Instanciamos los diferentes idiomas
        input_lang = Lang('port')
        output_lang = Lang('eng')

    for pair in pairs: #Iteramos por los pares de frases
        input_lang.addSentence(pair[0]) #Agregamos las palabras en eng
        output_lang.addSentence(pair[1]) #Agregamos las palabras en port

        # agregamos el EOS al final de la frase
        pair[0] += " EOS"
        pair[1] += " EOS"

    print("Longitud vocabularios:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)

    return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepareData('/content/drive/MyDrive/SIS421/SegundoParcial/Datasets/por.txt')

random.choice(pairs)

In [None]:
output_lang.indexesFromSentence('quero ir ao teu sitio preferido .')

In [None]:
output_lang.sentenceFromIndex([601, 312, 145, 1491, 9329, 8816, 4])

In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"

class Dataset(torch.utils.data.Dataset):
    def __init__(self, input_lang, output_lang, pairs, max_length):
        self.input_lang = input_lang #Inicia el primer idioma
        self.output_lang = output_lang #Inicia el segundo idioma
        self.pairs = pairs #Los pares de frases de un idioma a otro
        self.max_length = max_length #tamaño maximo de frases

    def __len__(self):
        return len(self.pairs) #Obtenemos la longitud

    def __getitem__(self, ix):
        # Convierte la frase del idioma de entrada a índices numéricos
        inputs = torch.tensor(self.input_lang.indexesFromSentence(self.pairs[ix][0]), device=device, dtype=torch.long)
        # Convertir la frase del idioma de salida a índices numéricos
        outputs = torch.tensor(self.output_lang.indexesFromSentence(self.pairs[ix][1]), device=device, dtype=torch.long)
        # Agregamos relleno a las frases hasta la longitud máxima con el token de relleno correspondiente al idioma de entrada y salida
        return torch.nn.functional.pad(inputs, (0, self.max_length - len(inputs)), 'constant', self.input_lang.word2index['PAD']), \
            torch.nn.functional.pad(outputs, (0, self.max_length - len(outputs)), 'constant', self.output_lang.word2index['PAD'])


# separamos datos en train 80% -test 20%
train_size = len(pairs) * 80 // 100
train = pairs[:train_size]
test = pairs[train_size:]

dataset = {
    'train': Dataset(input_lang, output_lang, train, max_length=MAX_LENGTH),
    'test': Dataset(input_lang, output_lang, test, max_length=MAX_LENGTH)
}

len(dataset['train']), len(dataset['test'])

In [None]:
input_sentence, output_sentence = dataset['train'][1]

input_sentence, output_sentence

In [None]:
input_lang.sentenceFromIndex(input_sentence.tolist()), output_lang.sentenceFromIndex(output_sentence.tolist())

In [None]:
dataloader = {
    'train': torch.utils.data.DataLoader(dataset['train'], batch_size=64, shuffle=True),
    'test': torch.utils.data.DataLoader(dataset['test'], batch_size=256, shuffle=False),
}

inputs, outputs = next(iter(dataloader['train']))
inputs.shape, outputs.shape

## El modelo

En lo que se refiere al `encoder`, seguimos usando exactamente la misma arquitectura. La única diferencia es que, además del último estado oculto, necesitaremos todas sus salidas para que el `decoder` pueda usarlas.

In [None]:
class Encoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size=100, hidden_size=100, n_layers=2):
        super().__init__()
        # Inicializamos la capa oculta
        self.hidden_size = hidden_size
        # Capa de embedding para convertir los índices de las palabras en vectores de embedding
        self.embedding = torch.nn.Embedding(input_size, embedding_size)
        # Procesamos los input_sizes, con el vector de embedings y el gru con el hidden_size
        self.gru = torch.nn.GRU(embedding_size, hidden_size, num_layers=n_layers, batch_first=True)

    def forward(self, input_sentences):
        # Convertimos los índices de las palabras en vectores de embedding
        embedded = self.embedding(input_sentences)
        # Pasar los embeddings a través de la capa GRU para obtener salidas y el último estado oculto
        # preparando para el decoder
        outputs, hidden = self.gru(embedded)
        return outputs, hidden

In [None]:
encoder = Encoder(input_size=input_lang.n_words)
encoder_outputs, encoder_hidden = encoder(torch.randint(0, input_lang.n_words, (64, 10)))

# [batch size, seq len, hidden size]
encoder_outputs.shape

In [None]:
# [num layers, batch size, hidden size]
encoder_hidden.shape

### El *decoder* con *attention*

In [None]:
class AttnDecoder(torch.nn.Module):
    def __init__(self, input_size, embedding_size=100, hidden_size=100, n_layers=2, max_length=MAX_LENGTH):
        super().__init__()

        # Capa de embedding para convertir los índices de las palabras en vectores de embedding
        self.embedding = torch.nn.Embedding(input_size, embedding_size)

        # Procesamos los input_sizes, con el vector de embedings y el gru con el hidden_size
        self.gru = torch.nn.GRU(embedding_size, hidden_size, num_layers=n_layers, batch_first=True)

        # Definimos una capa lineal que proyecta los estados ocultos del decoder,
        # con el objetivo de predecir la distribución de probabilidad de las palabras
        # de salida en el vocabulario de destino.
        self.out = torch.nn.Linear(hidden_size, input_size)

        # Toma la capa oculta concatenandolo con el embeding, que tendra una salida
        # de longitud maxima, que representa los pesos de atencion para cada elemento en la secuencia de entrada.
        self.attn = torch.nn.Linear(hidden_size + embedding_size, max_length)

        # [ vector oculto | contexto ponderado ], resultado-> Un nuevo vector oculto del tamaño hidensize,
        # sacando la prediccion final
        self.attn_combine = torch.nn.Linear(hidden_size * 2, hidden_size)


    def forward(self, input_words, hidden, encoder_outputs):
        # sacamos los embeddings
        embedded = self.embedding(input_words)
        # calculamos los pesos de la capa de atención
        attn_weights = torch.nn.functional.softmax(self.attn(torch.cat((embedded.squeeze(1), hidden[0]), dim=1)))
        # re-escalamos los outputs del encoder con estos pesos
        attn_applied = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)
        output = torch.cat((embedded.squeeze(1), attn_applied.squeeze(1)), 1)
        # aplicamos la capa de atención
        output = self.attn_combine(output)
        output = torch.nn.functional.relu(output)
        # a partir de aquí, como siempre. La diferencia es que la entrada a la RNN
        # no es directmanete el embedding sino una combinación del embedding
        # y las salidas del encoder re-escaladas
        output, hidden = self.gru(output.unsqueeze(1), hidden)
        output = self.out(output.squeeze(1))
        return output, hidden, attn_weights

In [None]:
decoder = AttnDecoder(input_size=output_lang.n_words)
decoder_output, decoder_hidden, attn_weights = decoder(torch.randint(0, output_lang.n_words, (64, 1)), encoder_hidden, encoder_outputs)

# [batch size, vocab size]
decoder_output.shape

In [None]:
# [num layers, batch size, hidden size]
decoder_hidden.shape

In [None]:
# [batch size, max_length]
attn_weights.shape

## Entrenamiento

Vamos a implementar el bucle de entrenamiento. En primer lugar, al tener ahora dos redes neuronales, necesitaremos dos optimizadores (uno para el `encoder` y otro para el `decoder`). Al `encoder` le pasaremos la frase en el idioma original, y obtendremos el estado oculto final. Este estado oculto lo usaremos para inicializar el `decoder` que, junto al token `<sos>`, generará la primera palabra de la frase traducida. Repetiremos el proceso, utilizando como entrada la anterior salida del decoder, hasta obtener el token `<eos>`.

In [None]:
from tqdm import tqdm
import numpy as np
import torch
import zipfile
import os

def fit(encoder, decoder, dataloader, epochs=10):
    encoder.to(device)
    decoder.to(device)
    encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=1e-3)
    decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=1e-3)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(1, epochs+1):
        encoder.train()
        decoder.train()
        train_loss = []
        bar = tqdm(dataloader['train'])
        for batch in bar:
            input_sentences, output_sentences = batch
            bs = input_sentences.shape[0]
            loss = 0
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()
            # obtenemos el último estado oculto del encoder
            encoder_outputs, hidden = encoder(input_sentences)
            # calculamos las salidas del decoder de manera recurrente
            decoder_input = torch.tensor([[output_lang.word2index['SOS']] for b in range(bs)], device=device)
            for i in range(output_sentences.shape[1]):
                output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
                loss += criterion(output, output_sentences[:, i].view(bs))
                # el siguiente input será la palabra predicha
                decoder_input = torch.argmax(output, axis=1).view(bs, 1)
            # optimización
            loss.backward()
            encoder_optimizer.step()
            decoder_optimizer.step()
            train_loss.append(loss.item())
            bar.set_description(f"Epoch {epoch}/{epochs} loss {np.mean(train_loss):.5f}")

        val_loss = []
        encoder.eval()
        decoder.eval()
        with torch.no_grad():
            bar = tqdm(dataloader['test'])
            for batch in bar:
                input_sentences, output_sentences = batch
                bs = input_sentences.shape[0]
                loss = 0
                # obtenemos el último estado oculto del encoder
                encoder_outputs, hidden = encoder(input_sentences)
                # calculamos las salidas del decoder de manera recurrente
                decoder_input = torch.tensor([[output_lang.word2index['SOS']] for b in range(bs)], device=device)
                for i in range(output_sentences.shape[1]):
                    output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
                    loss += criterion(output, output_sentences[:, i].view(bs))
                    # el siguiente input será la palabra predicha
                    decoder_input = torch.argmax(output, axis=1).view(bs, 1)
                val_loss.append(loss.item())
                bar.set_description(f"Epoch {epoch}/{epochs} val_loss {np.mean(val_loss):.5f}")

     # Guardamos los pesos entrenados del encoder y decoder
    torch.save(encoder.state_dict(), 'encoder_weights.pth')
    torch.save(decoder.state_dict(), 'decoder_weights.pth')

    # Comprimimos los archivos de pesos en un archivo zip
    with zipfile.ZipFile('model_weights.zip', 'w') as zipf:
        zipf.write('encoder_weights.pth')
        zipf.write('decoder_weights.pth')

    # Eliminamos los archivos individuales de pesos
    os.remove('encoder_weights.pth')
    os.remove('decoder_weights.pth')

    print("Pesos del modelo guardados y comprimidos en 'model_weights.zip'.")

In [None]:
fit(encoder, decoder, dataloader, epochs=30)

## Generando traducciones

Una vez tenemos nuestro modelo entrenado, podemos utilizarlo para traducir frases del inglés al castellano de la siguiente manera.

In [None]:
import torch
import zipfile
import os

def load_model_weights(encoder, decoder, zip_path, device):
    # Descomprimir el archivo zip
    with zipfile.ZipFile(zip_path, 'r') as zipf:
        zipf.extractall()

    # Cargar los pesos en los modelos
    encoder.load_state_dict(torch.load('encoder_weights.pth', map_location=device))
    decoder.load_state_dict(torch.load('decoder_weights.pth', map_location=device))

    # Eliminar los archivos individuales de pesos
    os.remove('encoder_weights.pth')
    os.remove('decoder_weights.pth')

    encoder.to(device)
    decoder.to(device)

    print("Pesos del modelo cargados correctamente.")

In [None]:
load_model_weights(encoder, decoder, '/content/drive/MyDrive/SIS421/SegundoParcial/model_weights.zip', device)

In [None]:
input_sentence, output_sentence = dataset['train'][600]
input_lang.sentenceFromIndex(input_sentence.tolist()), output_lang.sentenceFromIndex(output_sentence.tolist())

In [None]:
def predict(input_sentence):
    # obtenemos el último estado oculto del encoder
    encoder_outputs, hidden = encoder(input_sentence.unsqueeze(0))
    # calculamos las salidas del decoder de manera recurrente
    decoder_input = torch.tensor([[output_lang.word2index['SOS']]], device=device)
    # iteramos hasta que el decoder nos de el token <eos>
    outputs = []
    decoder_attentions = torch.zeros(MAX_LENGTH, MAX_LENGTH)
    i = 0
    while True:
        output, hidden, attn_weights = decoder(decoder_input, hidden, encoder_outputs)
        decoder_attentions[i] = attn_weights.data
        i += 1
        decoder_input = torch.argmax(output, axis=1).view(1, 1)
        outputs.append(decoder_input.cpu().item())
        if decoder_input.item() == output_lang.word2index['EOS']:
            break
    return output_lang.sentenceFromIndex(outputs), decoder_attentions

In [None]:
output_words, attn = predict(input_sentence)
output_words