# 4.2 - Mecanismos de atención

En este notebook, exploraremos cómo resolver un problema de predicción de palabras utilizando mecanismos de self-attention en lugar de LSTMs.

El mecanismo de self-attention permite que cada elemento de una secuencia preste atención a otros elementos de la misma secuencia. Esto es útil para capturar dependencias a **largo plazo** a diferencia de las redes recurrentes tradicionales.

Además, a diferencia de las redes LSTM o GRU, estas son **altamente paralelizables** dado que permiten tratar cada elemento de la secuencia de forma individual.


## Conjunto de datos

Utilizaremos el mismo conjunto de datos que en la práctica anterior: el libro "Trafalgar" de Benito Pérez Galdós. Nuestro objetivo será predecir la siguiente palabra en una secuencia de 5 palabras dadas.

### Descargar conjunto

In [None]:
import requests

# Descargar el texto de "Trafalgar" de Benito Pérez Galdós
url = "https://www.gutenberg.org/cache/epub/16961/pg16961.txt"
response = requests.get(url, timeout=30)
text = response.text

### Preprocesar texto

Normalmente los conjuntos de datos contienen errores o elementos no deseados, en esta parte los eliminaremos. Algunos ejemplos típicos son los números, los saltos de línea `\n` o los tabuladores `\t`. <br>
Si abres la url del libro en tu navegador verás que al inicio y al final nos aparece un aviso de copyright que tendremos que eliminar.

En este punto verás que también separamos el texto en frases haciendo uso de la librería de texto `nltk`.

In [None]:
import re
import time
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize
from unidecode import unidecode

# Eliminar saltos de línea y retornos de carro
text = re.sub(r'[\n\r]+', ' ', text)
# Eliminamos cabecera y fin en inglés
text = text.split("FIN DE TRAFALGAR")[0]
text = text.split(" -I- ")[1]
# Dividir el texto en frases utilizando la librería nltk
sentences = sent_tokenize(text)
# Eliminar acentos utilizando la librería unidecode
sentences = [unidecode(s) for s in sentences]
# Pasar a minúsculas y eliminar resto de caracteres extraños (dos puntos, punto y coma, números...)
sentences = [re.sub(r'[^a-z\s]', '', s.lower()) for s in sentences]

### Obtener vocabulario

Una vez tenemos todas las frases, para codificar cada una de las palabras del texto es necesario obtener el llamado `corpus` o `vocabulario`. Este nos sirve para saber que palabras únicas (sin repetición) aparecen en nuestro texto, lo cual es útil para asignar un índice a cada una. Como verás, en este caso asignamos el índice en función de la frecuencia de aparición, por tanto, la palabra más frecuente tendrá el índice más bajo.

Como recordarás, nuestro objetivo es que, dado un texto de máximo 5 palabras, el modelo entrenado sea capaz de continuar la frase hasta que decida finalizarla. Para lograrlo tenemos que tener en cuenta varios escenarios:
* ¿Y si nos dan menos de 5 palabras?
* ¿Como hacemos para detectar que la palabra generada por el modelo es la última de la frase?

Para solventar estos problemas, vamos a añadir dos palabras nuevas o `tokens` nuevos a nuestro vocabulario:
* **\<pad\>**: Token que representa *padding*, es decir, si nos dan menos de 5 palabras, rellenaremos las que falten con este padding.
* **\<eos\>**: Token que representa *end-of-sequence*, es decir, fin de secuencia. Si el modelo predice este token, sabremos que ya se acabó la frase y podemos detener la generación.


In [None]:
from collections import Counter

# Crear una lista con todas las palabras del texto
words = " ".join(sentences).split()
# Obtener el corpus del texto (todas las palabras que aparecen y su frecuencia)
word_counts = Counter(words)
# Filtrar palabras que aparecen menos de 5 veces, dado que no aportan mucho
word_counts = {word: count for word, count in word_counts.items() if count >=5}
# Ordenamos el corpus por frecuencia
word_counts = dict(sorted(word_counts.items(), key=lambda x: x[1], reverse=True))
# Extraemos solo las claves del diccionario anterior para crear el corpus
vocab = list(word_counts.keys())
# Añadimos tokens especiales <pad> (padding), <eos> (end of sequence), y <unk> (unknown)
vocab = ["<pad>", "<unk>"] + vocab + ["<eos>"]
# Generamos un diccionario (y su inverso) donde asignamos un id a cada palabra según su frecuencia.
# La palabra más frecuente será la 1. El 0 lo dejamos para el padding
word_to_ix = {word: i for i, word in enumerate(vocab)}
ix_to_word = {i: word for i, word in enumerate(vocab)}
vocab_size = len(vocab)

print(f'Tamaño del vocabulario: {vocab_size}')
print(f'Word to Index mapping: {word_to_ix}')

### Análisis de datos
En todos los problemas de aprendizaje, es muy importante hacer un análisis de los datos con los que vamos a trabajar, lo cual nos puede servir para diagnosticar o evitar futuros problemas.<br>
En este caso realizamos un simple análisis de la frecuencia de las palabras. 

Como siempre suele suceder en estos casos, las palabras más comunes son las llamadas `stop-words`. Estas palabras son las que más solemos repetir en nuestros textos y las forman los artículos, las preposiciones, las conjunciones, ... 

In [None]:
import matplotlib.pyplot as plt

# Visualización de las 20 palabras más comunes
words_x = list(word_counts.keys())[:20]
counts_y = list(word_counts.values())[:20]

plt.figure(figsize=(10, 6))
plt.bar(words_x, counts_y)
plt.title('Palabras más comunes')
plt.xlabel('Palabras')
plt.ylabel('Frecuencia')
plt.show()

### Crear dataset Pytorch
Una vez tenemos nuestro conjunto limpio y analizado, podemos crear los ejemplos con los que alimentaremos nuestro modelo. Como habíamos comentado queremos intentar completar una frase dadas, como máximo 5 palabras.<br>
En la siguiente clase `SentenceDataset` crearemos los ejemplos a partir de las frases del texto descargado. Estos ejemplos han de reflejar todos los escenarios a los que luego queremos que nuestro modelo pueda enfrentarse.

Por tanto, para cada frase, nuestro `Dataset` creará los siguientes ejemplos:
* Frase: *Frase de ejemplo*
    * [\<pad\>, \<pad\>, \<pad\>, \<pad\>, frase], de
    * [\<pad\>, \<pad\>, \<pad\>,  frase, de], ejemplo
    * [\<pad\>, \<pad\>, frase,  de, ejemplo], \<eos\>

El primer vector es la secuencia de entrada y el elemento del final es la salida deseada. Ten en cuenta que al modelo no le damos el texto, le daremos los índices de cada palabra.


Si nuestro `word_to_ix` fuese este:
```python
word_to_ix = {"<pad>":0, "frase":1, "de":2, "ejemplo":3, "<eos>":4}
```
Nuestros ejemplos quedarían así:
* [0, 0, 0, 0, 1], 2
* [0, 0, 0, 1, 2], 3
* [0, 0, 1, 2, 3], 4

> **Nota:** La attention soporta la entrada de máscaras que sirven para indicar que elementos de la entrada son irrelevantes (\<pad\>). Se ha modificado el método `__get_item__` para que, además de la x y la y, también retorne un vector con la máscara de cada ejemplo.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class SentenceDataset(Dataset):
    def __init__(self, sentences, word_to_ix, seq_length):
        self.data = []
        self.seq_length = seq_length
        self.word_to_ix = word_to_ix
        
        # El índice para <unk>
        unk_index = self.word_to_ix["<unk>"]
        
        # Para cada frase del conjunto de datos..
        for sentence in sentences:
            # Separamos la frase en una lista de palabras y al final se añadimos el token <eos>
            words = sentence.split() + ['<eos>']
            # Para cada palabra, creamos un ejemplo
            for i in range(len(words)-1):
                # Obtenemos las palabras de cada ejemplo
                seq_in = words[max(0, i+1 - seq_length):i+1]
                # Añadir padding si es necesario
                seq_in = ['<pad>'] * (seq_length - len(seq_in)) + seq_in
                # Obtenemos la salida deseada para la secuencia de entrada (la siguiente palabra)
                seq_out = words[i+1]
                # Convertir palabras a índices
                seq_in_indices = [self.word_to_ix.get(word, unk_index) for word in seq_in]
                seq_out_index = self.word_to_ix.get(seq_out, unk_index)
                
                # Almacenamos el ejemplo solo si seq_out no es <unk>
                if seq_out_index != unk_index:
                    self.data.append((torch.tensor(seq_in_indices, dtype=torch.long), torch.tensor(seq_out_index, dtype=torch.long)))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Obtenemos un ejemplo concreto (ya se han creado todos en el init)
        seq_in, seq_out = self.data[idx]
        # Crear la máscara de padding (Las posiciones de padding se marcan como True)
        mask = seq_in == self.word_to_ix["<pad>"]
        return seq_in, seq_out, mask
 
seq_length = 5
dataset = SentenceDataset(sentences, word_to_ix, seq_length)
print(len(dataset))

### Dividir el conjunto de datos
Dividimos el conjunto al completo en ``entrenamiento``, ``validación`` y ``test`` (80%,10%,10%) de forma aleatoria.

In [None]:
# Fijar la semilla para obtener reproducibilidad
seed = 42
torch.manual_seed(seed)  # Semilla para PyTorch

# Dividir el dataset en entrenamiento, validación y prueba
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size = 256

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 4.2.3. - Modelo
Para este problema crearemos el modelo más sencillo posible:
* **Capa Embedding**: Para cada elemento de la entrada, aprenderá un embedding que lo representará en un espacio de dimensión `embed_size`.
* **Capa MultiheadAttention**: Esta capa permite al modelo enfocarse en diferentes partes de la secuencia de entrada simultáneamente. Cada "cabeza" en la atención múltiple (multi-head) puede capturar distintos patrones de relación entre los elementos de la secuencia.
    * En la capa LSTM de la práctica anterior, el último elemento de la secuencia de salida contiene información de toda la secuencia, **en este caso no**.
    * El número de heads se configura mediante `num_heads`.
* **Capa Linear**: Proyecta el vector del último elemento de la salida de la LSTM en un espacio de tamaño `output_size`. En este caso `output_size` es igual al tamaño del vocabulario dado que estamos intentando resolver un problema de multi-clasificación.

Nótese que no aplicamos la función `softmax` dado que esta se aplica internamente en la `CrossEntropyLoss`.

In [None]:
import torch
import torch.nn as nn

class SelfAttentionModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, seq_length):
        super(SelfAttentionModel, self).__init__()
        # Capa de embedding: Convierte los índices de las palabras en vectores densos de tamaño embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        # Capa de atención múltiple (MultiheadAttention): Permite al modelo enfocarse en diferentes partes de la secuencia
        self.attention = nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
        # Capa de flatten para aplanar la salida de la atención
        self.flatten = nn.Flatten()
        # Capa lineal para proyección al espacio de salida
        self.fc = nn.Linear(embed_size * seq_length, vocab_size)

    def forward(self, sentence, mask=None, return_att_mtx=False):
        # Aplicación de la capa de embedding para convertir las palabras en embeddings
        embeds = self.embedding(sentence)
        # Aplicamos la atención sobre la secuencia de embeddings (en este caso query, key y value son los mismos)
        # La máscara se aplica a la capa de atención para controlar qué posiciones se deben ignorar (las <pad>)
        attention_out, attention_weights = self.attention(embeds, embeds, embeds, key_padding_mask=mask)
        # Aplanar la salida de la atención
        flattened_out = self.flatten(attention_out)
        # Proyectar al espacio de salida
        out = self.fc(flattened_out)
        
        if return_att_mtx: 
            return out, attention_weights
        else: 
            return out

In [None]:
# Hiperparámetros del modelo
embed_size = 8
num_heads = 1
num_epochs = 25
learning_rate = 0.005

# Inicializar los modelos, loss function y optimizer
model = SelfAttentionModel(vocab_size, embed_size, num_heads, seq_length)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

## 4.2.4. - Entrenamiento

In [None]:
# Función para calcular la precisión (accuracy) en un conjunto de datos
def calculate_accuracy(model, data_loader, K=1):
    model.eval()  # Configuramos el modelo en modo evaluación (no se actualizan los pesos)
    correct = 0  # Contador para predicciones correctas
    total = 0  # Contador para el número total de ejemplos

    with torch.no_grad():  # Desactivamos el cálculo de gradientes
        for inputs, targets, masks in data_loader:
            inputs, targets, masks = inputs.to(device), targets.to(device), masks.to(device)
            outputs = model(inputs, masks)  # Hacemos una predicción con el modelo
            # Obtenemos las probabilidades de las clases (Top-K)
            topk = outputs.topk(K, dim=1)[1]
            # Creamos una máscara que indica si la etiqueta correcta está entre las top-K predicciones
            correct_mask = topk.eq(targets.view(-1, 1).expand_as(topk))
            correct += correct_mask.sum().item()  # Contamos las predicciones correctas
            total += targets.size(0)  # Contamos el número total de ejemplos

    accuracy = correct / total  # Calculamos la precisión como el porcentaje de predicciones correctas
    return accuracy

# Función de entrenamiento y validación
def train_model(model, optimizer, train_loader, val_loader, num_epochs):
    model.train()  # Configuramos el modelo en modo entrenamiento
    for epoch in range(num_epochs):
        start_time = time.time()  # Registramos el tiempo de inicio de la epoch

        total_train_loss = 0  # Inicializamos la variable para acumular la pérdida de entrenamiento
        total_val_loss = 0    # Inicializamos la variable para acumular la pérdida de validación

        # Entrenamiento
        model.train() # Configuramos el modelo en modo entrenamiento
        for inputs, targets, masks in train_loader:  # Iteramos sobre los datos de entrenamiento
            inputs, targets, masks = inputs.to(device), targets.to(device), masks.to(device)
            optimizer.zero_grad()  # Reseteamos los gradientes del optimizador
            outputs = model(inputs, masks)  # Hacemos una predicción con el modelo
            loss = criterion(outputs, targets)  # Calculamos la pérdida entre la predicción y el objetivo
            loss.backward()  # Calculamos los gradientes (backpropagation)
            optimizer.step()  # Actualizamos los pesos del modelo según el optimizador
            total_train_loss += loss.item()  # Acumulamos la pérdida de esta iteración

        # Calculamos la precisión para el conjunto de entrenamiento
        train_accuracy = calculate_accuracy(model, train_loader)
        
        # Validación
        model.eval()  # Configuramos el modelo en modo evaluación (no se actualizan los pesos)
        with torch.no_grad():  # Desactivamos el cálculo de gradientes para ahorrar memoria y mejorar la velocidad
            for inputs, targets, masks in val_loader:  # Iteramos sobre los datos de validación
                inputs, targets, masks = inputs.to(device), targets.to(device), masks.to(device)
                outputs = model(inputs, masks)  # Hacemos una predicción con el modelo
                loss = criterion(outputs, targets)  # Calculamos la pérdida para validación
                total_val_loss += loss.item()  # Acumulamos la pérdida de esta iteración

        # Calculamos la precisión para el conjunto de validación
        val_accuracy = calculate_accuracy(model, val_loader)
        val_accuracy_3 = calculate_accuracy(model, val_loader, K=3)

        # Calculamos el tiempo total de la epoch
        epoch_time = time.time() - start_time  

        # Imprimimos las estadísticas de la epoch: pérdida de entrenamiento, validación, precisión y tiempo
        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {total_train_loss/len(train_loader):.4f}, '
              f'Val Loss: {total_val_loss/len(val_loader):.4f}, '
              f'Train T1 Accuracy: {train_accuracy:.4f}, '
              f'Val T1 Accuracy: {val_accuracy:.4f}, '
              f'Val T3 Accuracy: {val_accuracy_3:.4f}, '
              f'Time: {epoch_time:.2f} sec')

# Entrenar el modelo Attention
print(f"Entrenando modelo Attention en {device}...")
train_model(model, optimizer, train_loader, val_loader, num_epochs)

## 4.2.5. - Prueba

In [None]:
def predict(model, text, word_to_ix, ix_to_word, seq_length, max_len=20):
    # Asegurarse de que el modelo esté en modo de evaluación
    model.eval()

    # Preprocesar el texto de entrada
    text = unidecode(text).lower()
    text = re.sub(r'[^a-z\s]', '', text)
    words = text.split()

    # Convertir palabras a índices
    input_sequence = [word_to_ix.get(word, word_to_ix['<pad>']) for word in words]
    input_sequence = input_sequence[-seq_length:]  # Tomar solo las últimas seq_length palabras
    input_sequence = [0] * (seq_length - len(input_sequence)) + input_sequence

    # Convertir la secuencia a tensor
    input_tensor = torch.tensor([input_sequence], dtype=torch.long)

    # Inicializar la lista de palabras generadas
    generated_words = words
    predicted_word = ""

    with torch.no_grad():
        while len(generated_words)<=max_len and predicted_word!='<eos>':
            input_tensor = input_tensor.to(device)
            mask = input_tensor == word_to_ix['<pad>']
            # Hacer una predicción usando el modelo
            output = model(input_tensor, mask)
            
            # Obtener la palabra con la probabilidad más alta
            _, predicted_index = torch.max(output, 1)
            predicted_word = ix_to_word[predicted_index.item()]

            # Añadir la palabra generada a la lista
            generated_words.append(predicted_word)

            # Actualizar la secuencia de entrada para la siguiente predicción
            input_sequence = input_sequence[1:] + [predicted_index.item()]
            input_tensor = torch.tensor([input_sequence], dtype=torch.long)

    return ' '.join(generated_words)

# Ejemplo de texto de entrada
input_text = "La batalla de"
predicted_text = predict(model, input_text, word_to_ix, ix_to_word, seq_length)
print(predicted_text)

## 4.2.6. - Ejercicios

> **EJERCICIO:** ¿Que sucede si no se eliminan los ejemplos que predicen \<unk\> del dataset?

In [None]:
# Hay que eliminar la siguiente parte del generador de datos
if seq_out_index != unk_index:
    self.data.append((torch.tensor(seq_in_indices, dtype=torch.long), torch.tensor(seq_out_index, dtype=torch.long)))   
#Esto hará que el modelo, probablemente prediga casi siempre unk, dado que la mayoría de ejemplos, al haber eliminado muchas palabras poco frecuentes, tengan como salida unk

> **EJERCICIO:** Extrae y representa la matriz de atención del modelo para el texto: _El barco es muy antiguo_ .Puedes utilizar el siguiente código para visualizarla:


In [None]:
def plot_att_mtx(att_mtx, text):
    # Este método recibe la matriz de atención y un texto
    words = text.split()
    fig, ax = plt.subplots()
    cax = ax.matshow(att_mtx, cmap='viridis')
    fig.colorbar(cax)
    
    ax.set_xticks(torch.arange(len(words)))
    ax.set_yticks(torch.arange(len(words)))
    
    ax.set_xticklabels(words, rotation=90)
    ax.set_yticklabels(words)
    
    plt.show()

In [None]:
text = "El barco es muy antiguo"
# Asegurarse de que el modelo esté en modo de evaluación
model.eval()
# Preprocesar el texto de entrada
text = unidecode(text).lower()
text = re.sub(r'[^a-z\s]', '', text)
words = text.split()
# Convertir palabras a índices
input_sequence = [word_to_ix.get(word, word_to_ix['<pad>']) for word in words]
input_sequence = input_sequence[-seq_length:]  # Tomar solo las últimas seq_length palabras
input_sequence = [0] * (seq_length - len(input_sequence)) + input_sequence
# Convertir la secuencia a tensor
input_tensor = torch.tensor([input_sequence], dtype=torch.long)

with torch.no_grad():
 input_tensor = input_tensor.to(device)
 # Hacer una predicción usando el modelo y pedir matriz de atención
 output, att_mtx = model(input_tensor, return_att_mtx=True)
 # Eliminar dimensiones innecesarias de la matriz
 att_mtx = att_mtx.squeeze().cpu().numpy()
 
plot_att_mtx(att_mtx, text)

> **EJERCICIO:** Crea un nuevo modelo, sin eliminar el existente, donde se aprendan representaciones diferentes para cada palabra: Una para las **queries**, otra para las **keys** y otra para los **values**. Entrena el modelo de nuevo y compáralo con el anterior.

In [None]:
# Esto es una forma de hacerlo, otra sería aprendiendo 3 embeddings directamente para cada palabra de la entrada, el problema es que tendríamos que aprender muchos más pesos

class SelfAttentionModel_v2(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, seq_length):
        super(SelfAttentionModel_v2, self).__init__()
        # Capa de embedding: Convierte los índices de las palabras en vectores densos de tamaño embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # Capas de proyección para queries, keys y values
        self.query_projection = nn.Linear(embed_size, embed_size)
        self.key_projection = nn.Linear(embed_size, embed_size)
        self.value_projection = nn.Linear(embed_size, embed_size)
        
        # Capa de atención múltiple (MultiheadAttention): Permite al modelo enfocarse en diferentes partes de la secuencia
        self.attention = nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
        
        # Capa de flatten para aplanar la salida de la atención
        self.flatten = nn.Flatten()
        
        # Capa lineal para proyección al espacio de salida
        self.fc = nn.Linear(embed_size * seq_length, vocab_size)

    def forward(self, sentence, mask=None):
        # Aplicación de la capa de embedding para convertir las palabras en embeddings
        embeds = self.embedding(sentence)
        
        # Proyección de embeddings para queries, keys y values
        queries = self.query_projection(embeds)
        keys = self.key_projection(embeds)
        values = self.value_projection(embeds)
        
        # Aplicamos la atención sobre la secuencia de embeddings
        # La máscara se aplica a la capa de atención para controlar qué posiciones se deben ignorar (las <pad>)
        attention_out, _ = self.attention(queries, keys, values, key_padding_mask=mask)
        
        # Aplanar la salida de la atención
        flattened_out = self.flatten(attention_out)
        
        # Proyectar al espacio de salida
        out = self.fc(flattened_out)
        
        return out

> **EJERCICIO:** Adapta el modelo para que resuelva un problema de análisis de sentimientos. El objetivo será predecir, a partir de un texto (tweets en este caso), el tono o sentimiento que refleja (positivo, neutro o negativo). A continuación tienes el código necesario para obtener el dataset.

In [None]:
import pandas as pd
data_url = "https://raw.githubusercontent.com/pablo-pnunez/datasets/master/texto/sentiment_analysis_twitter.csv"
twitter_data = pd.read_csv(data_url)
tweets, tweets_sentiment = twitter_data["text"].str.strip().values, twitter_data["sentiment"].values

In [None]:
# OJO, hay que eliminar algún nan del conjunto descargado y alguna review que no tiene nada. Estas últimas hacen que la attention sea nan.

import re
import time
import nltk
from unidecode import unidecode

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

x_data = []
y_data = []

for idx, tweet in enumerate(tweets):
    try:
        # Eliminar saltos de línea y retornos de carro
        tweet = re.sub(r'[\n\r]+', ' ', tweet)
        # Eliminar acentos utilizando la librería unidecode
        tweet = unidecode(tweet)
        # Pasar a minúsculas y eliminar resto de caracteres extraños (dos puntos, punto y coma, números...)
        tweet = re.sub(r'[^a-z\s]', '', tweet.lower())
        # Eliminamos más de un espacio
        tweet = re.sub(r'(\s)+', ' ', tweet)
        # Filtrar secuencias de 1 palabra o menos
        if len(tweet.split()) > 1:
            x_data.append(tweet)
            y_data.append(tweets_sentiment[idx])
    except:
        print(f"Error con el ejemplo {idx}, no se añade.")

In [None]:
from collections import Counter

# Crear una lista con todas las palabras del texto
words = " ".join(x_data).split()
# Obtener el corpus del texto (todas las palabras que aparecen y su frecuencia)
word_counts = Counter(words)
# Filtrar palabras que aparecen menos de 5 veces, dado que no aportan mucho
word_counts = {word: count for word, count in word_counts.items() if count >=5}
# Ordenamos el corpus por frecuencia
word_counts = dict(sorted(word_counts.items(), key=lambda x: x[1], reverse=True))
# Extraemos solo las claves del diccionario anterior para crear el corpus
vocab = list(word_counts.keys())
# Añadimos tokens especiales <pad> (padding) y <unk> (unknown)
vocab = ["<pad>", "<unk>"] + vocab 
# Generamos un diccionario (y su inverso) donde asignamos un id a cada palabra según su frecuencia.
# La palabra más frecuente será la 1. El 0 lo dejamos para el padding
word_to_ix = {word: i for i, word in enumerate(vocab)}
ix_to_word = {i: word for i, word in enumerate(vocab)}
vocab_size = len(vocab)

print(f'Tamaño del vocabulario: {vocab_size}')
print(f'Word to Index mapping: {word_to_ix}')

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

class SentimentDataset(Dataset):
    def __init__(self, x_data, y_data, word_to_ix, seq_length):
        self.data = []
        self.seq_length = seq_length
        self.word_to_ix = word_to_ix
        
        # Un diccionario para realizar el One-Hot en la salida
        y_map = {"positive":0, "neutral":1, "negative":2}

        # El índice para <unk>
        unk_index = self.word_to_ix["<unk>"]
        
        # Para cada frase del conjunto de datos..
        for idx, sentence in enumerate(x_data):
            # Separamos la frase en una lista de palabras
            words = sentence.split()
            # Creamos un vector con el padding necesario hasta llegar a seq_length
            seq_in = ['<pad>'] * (seq_length - len(words)) 
            # Añadimos las palabras
            seq_in += words
            # Convertir palabras a índices
            seq_in_indices = [self.word_to_ix.get(word, unk_index) for word in seq_in]
            # Transformar la salida a One-Hot
            out = y_map[y_data[idx]]
            # Almacenamos el ejemplo
            self.data.append((torch.tensor(seq_in_indices, dtype=torch.long), torch.tensor(out, dtype=torch.long)))
            
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Obtenemos un ejemplo concreto (ya se han creado todos en el init)
        seq_in, seq_out = self.data[idx]
        # Crear la máscara de padding (Las posiciones de padding se marcan como True)
        mask = seq_in == self.word_to_ix["<pad>"]
        return seq_in, seq_out, mask

# Obtener la longitud de la mayor secuencia
max_seq_length = max([len(s.split(" ")) for s in x_data])
dataset = SentimentDataset(x_data, y_data, word_to_ix, max_seq_length)
print(len(dataset))

# Fijar la semilla para obtener reproducibilidad
seed = 42
torch.manual_seed(seed)  # Semilla para PyTorch

# Dividir el dataset en entrenamiento, validación y prueba
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size = 256

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class SentimentSelfAttentionModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, seq_length):
        super().__init__()
        # Capa de embedding: Convierte los índices de las palabras en vectores densos de tamaño embed_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        # Capa de atención múltiple (MultiheadAttention): Permite al modelo enfocarse en diferentes partes de la secuencia
        self.attention = nn.MultiheadAttention(embed_size, num_heads, batch_first=True)
        # Capa de flatten para aplanar la salida de la atención
        self.flatten = nn.Flatten()
        # Capa lineal para proyección al espacio de salida
        self.fc = nn.Linear(embed_size * seq_length, 3)

    def forward(self, sentence, mask=None, return_att_mtx=False):
        # Aplicación de la capa de embedding para convertir las palabras en embeddings
        embeds = self.embedding(sentence)
        # Aplicamos la atención sobre la secuencia de embeddings (en este caso query, key y value son los mismos)
        # La máscara se aplica a la capa de atención para controlar qué posiciones se deben ignorar (las <pad>)
        attention_out, attention_weights = self.attention(embeds, embeds, embeds, key_padding_mask=mask)
        # Aplanar la salida de la atención
        flattened_out = self.flatten(attention_out)
        # Proyectar al espacio de salida
        out = self.fc(flattened_out)
        
        if return_att_mtx: 
            return out, attention_weights
        else: 
            return out
        
# Hiperparámetros del modelo
embed_size = 6
num_heads = 1
num_epochs = 50
learning_rate = 0.0005

# Inicializar los modelos, loss function y optimizer
model = SentimentSelfAttentionModel(vocab_size, embed_size, num_heads, max_seq_length)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Entrenar el modelo Attention
print(f"Entrenando modelo Attention en {device}...")
train_model(model, optimizer, train_loader, val_loader, num_epochs)

In [None]:
def preprocess_text(text, word_to_ix, seq_length):
    # Eliminar saltos de línea y retornos de carro
    text = re.sub(r'[\n\r]+', ' ', text)
    # Eliminar acentos
    text = unidecode(text)
    # Pasar a minúsculas y eliminar resto de caracteres extraños
    text = re.sub(r'[^a-z\s]', '', text.lower())
    # Eliminamos más de un espacio
    text = re.sub(r'(\s)+', ' ', text)
    # Separamos en palabras
    words = text.split()
    # Creamos un vector con el padding necesario hasta llegar a seq_length
    seq_in = ['<pad>'] * (seq_length - len(words)) + words
    # Convertir palabras a índices
    seq_in_indices = [word_to_ix.get(word, word_to_ix["<unk>"]) for word in seq_in]
    return torch.tensor(seq_in_indices, dtype=torch.long).unsqueeze(0)  # Añadimos dimensión de batch

def predict(model, text, word_to_ix, seq_length, device):
    model.eval()  # Configuramos el modelo en modo evaluación
    with torch.no_grad():
        # Preprocesar y codificar el texto
        input_tensor = preprocess_text(text, word_to_ix, seq_length).to(device)
        mask = input_tensor == word_to_ix["<pad>"]
        # Obtener la predicción
        output = model(input_tensor, mask)
        # Obtener la clase con la mayor probabilidad
        _, predicted_class = torch.max(output, 1)
        return predicted_class.item()

# Ejemplo de uso
text_to_predict = "I see you"
predicted_class = predict(model, text_to_predict, word_to_ix,  max_seq_length, device)

# Mapear la clase predicha a su etiqueta
class_labels = {0: "positive", 1: "neutral", 2: "negative"}
predicted_label = class_labels[predicted_class]

print(f'Texto: "{text_to_predict}"')
print(f'Predicción: {predicted_label}')
