In [None]:
import re
import nltk
import torch
import fasttext
import unicodedata
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

from os import putenv
from tqdm import trange
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from torch.nn.functional import one_hot
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

#Para usar AMD GPU's con ROC
putenv("HSA_OVERRIDE_GFX_VERSION", "10.3.0")

EXPERIMENTAR = True
NUM_CLASES = 3
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)
torch.manual_seed(RANDOM_STATE)

nltk.download('stopwords')
STOPWORDS = stopwords.words("spanish")

<h1>Red Neuronal con Pytorch</h1>

In [None]:
class FeedForwardNeuralNetwork(nn.Module):
    def __init__(self, input_size: int, hiden_sizes: list[int], output_size: int):
        super().__init__()
        self.fcl = nn.ModuleList()
        self.act = nn.ModuleList()
        
        # Las capas se forman por pares de numeros, en total 1 par menos que la lista neuronas
        neuronas = [input_size] + hiden_sizes + [output_size]
        for i in range(len(neuronas) - 1):
            self.fcl.append(nn.Linear(neuronas[i], neuronas[i + 1]))
            nn.init.xavier_uniform_(self.fcl[i].weight) # type: ignore
            nn.init.zeros_(self.fcl[i].bias) # type: ignore
        #Al aplicar CrossEntropy se necesita una función de activación menos que de capas
        for i in range(len(neuronas) - 2):
            self.act.append(nn.ReLU())
    
    def forward(self, X):
        x = X
        for i in range(len(self.act)):
            x = self.fcl[i](x)
            x = self.act[i](x)
        x = self.fcl[-1](x)     #No activar la última capa
        return x

<h1>Funciones de utilidad</h1>

In [None]:
def normaliza_texto(input_str,
                    punct=False,
                    accents=False,
                    num=False,
                    max_dup=2):
    """
        punct=False (elimina la puntuación, True deja intacta la puntuación)
        accents=False (elimina los acentos, True deja intactos los acentos)
        num= False (elimina los números, True deja intactos los acentos)
        max_dup=2 (número máximo de símbolos duplicados de forma consecutiva, rrrrr => rr)
    """
    PUNCTUACTION = ";:,.\\-\"'/"
    SYMBOLS = "()[]¿?¡!{}~<>|"
    NUMBERS= "0123456789"
    SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)

    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    for c in nfkd_f:
        if not num:
            if c in NUMBERS:
                continue
        if not punct:
            if c in SKIP_SYMBOLS:
                continue
        if not accents and unicodedata.combining(c):
            continue
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto

def eliminar_stopwords(texto: str):
    tokens = [t for t in texto.split() if t not in STOPWORDS]
    return ' '.join(tokens)
    
def aplicar_stemming(texto: str):
    stemmer = SnowballStemmer("spanish")
    tokens = [stemmer.stem(t) for t in texto.split()]
    return ' '.join(tokens)

def preprocesar(texto: str):
    texto = normaliza_texto(texto)
    texto = eliminar_stopwords(texto)
    texto = aplicar_stemming(texto)
    return texto

def leer_datos(filename, transform=False):
    dataset = pd.read_json(filename, lines=True)
    X = dataset['text']
    if transform:
        X = X.to_numpy()
    Y = dataset['klass'].to_numpy()
    return X, Y

def vectorizar_TF_IDF(X, Y, test_size=0.2, val_size=0.1, ngram_range=(1,2)):   
    vec = TfidfVectorizer(analyzer='word', 
                        preprocessor=preprocesar,
                        ngram_range=ngram_range)
    
    X_train, X_test, Y_train, Y_test =  train_test_split(X, Y, test_size=test_size, stratify=Y, random_state=RANDOM_STATE)
    X_train, X_val, Y_train, Y_val =  train_test_split(X_train, Y_train, test_size=val_size, stratify=Y_train, random_state=RANDOM_STATE)
    
    X_train_vec = vec.fit_transform(X_train)
    X_test_vec = vec.transform(X_test)
    X_val_vec = vec.transform(X_val)
    
    return X_train_vec, X_test_vec, X_val_vec, Y_train, Y_test, Y_val

def vectorizar_embeddings(X, Y, variante: str, test_size=0.2, val_size=0.1):
    if variante == 'MX':
        ft = fasttext.load_model('./fasttext/MX.bin')
    elif variante == 'ES':
        ft = fasttext.load_model('./fasttext/ES.bin')
    elif variante == 'GEN':
        ft = fasttext.load_model('./fasttext/cc.es.300.bin')

    X_ = X.map(lambda x : ft.get_sentence_vector(x)) # type: ignore
    X_ = np.vstack(X_.to_numpy())    
    X_train, X_test, Y_train, Y_test =  train_test_split(X_, Y, test_size=test_size, stratify=Y, random_state=RANDOM_STATE)
    X_train, X_val, Y_train, Y_val =  train_test_split(X_train, Y_train, test_size=val_size, stratify=Y_train, random_state=RANDOM_STATE)
    
    return X_train, X_test, X_val, Y_train, Y_test, Y_val
    
def torchificar(X: list, Y: list, representacion: str, one_hot_encoding=False):
    le = LabelEncoder()
    X_torch = []
    Y_torch = []
    
    for x in X:
        if representacion == 'TF-IDF':
            x_np = x.toarray().astype(np.float32)
        elif representacion == 'Embeddings':
            x_np = x.astype(np.float32)
        x_torch = torch.from_numpy(x_np) # type: ignore
        if torch.cuda.is_available():
            x_torch = x_torch.cuda()
        X_torch.append(x_torch)
    
    for y in Y:
        y_vec = le.fit_transform(y)
        y_torch = torch.from_numpy(y_vec)
        if torch.cuda.is_available():
            y_torch = y_torch.cuda()
        Y_torch.append(y_torch)
    if one_hot_encoding:
        Y_torch[0] = one_hot(Y_torch[0], num_classes=NUM_CLASES).float()
    
    return X_torch, Y_torch

def create_minibatches(X, Y, batch_size):
    dataset = TensorDataset(X, Y)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return loader

def entrenar(Xs, Ys, modelo: FeedForwardNeuralNetwork, optimizador: optim.Optimizer, funcion_perdida, epocas=100, batch_size=32, verbose=False):
    historial_perdida = []
    if torch.cuda.is_available():
        modelo.cuda()
    
    for _ in trange(epocas, desc=f'Entrenando'):
        modelo.train()
        lossTotal = 0   
        dataloader = create_minibatches(Xs[0], Ys[0], batch_size=batch_size) #Xs[0] = X_train
        
        for X_tr, Y_tr in dataloader:
            optimizador.zero_grad()
            Y_pred = modelo(X_tr)
            loss = funcion_perdida(Y_pred, Y_tr)
            Y_pred = torch.softmax(Y_pred, dim=1)
            Y_pred = torch.argmax(Y_pred, dim=1)
            lossTotal += loss.item()
            loss.backward()
            optimizador.step()
                    
        perdida = lossTotal/len(dataloader)
        historial_perdida.append(perdida)
        if verbose:
            f1, _, _, _ = evaluar(Xs[2], Ys[2], modelo)
            print('Pérdida: ', perdida)
            print('F1-score: ', f1)
    
    return historial_perdida

def evaluar(X, Y, modelo: FeedForwardNeuralNetwork, verbose=False):
    modelo.eval()
    with torch.no_grad():
        Y_pred = modelo(X)
        Y_pred = torch.softmax(Y_pred, dim=1)
        Y_pred = torch.argmax(Y_pred, dim=1)
        
        if torch.cuda.is_available():
            Y_ = Y.cpu()
            Y_pred = Y_pred.cpu()
        else:
            Y_ = Y
        
        f1 = f1_score(Y_, Y_pred, average='macro')
        a = accuracy_score(Y_, Y_pred)
        p = precision_score(Y_, Y_pred, average='macro')
        r = recall_score(Y_, Y_pred, average='macro')
        if verbose:
            print("F1-score: ", f1)
            print("Accuracy: ", a)
            print("Precision: ", p)
            print("Recall: ", r)
    return f1, a, p, r

def guardar_modelo(modelo: FeedForwardNeuralNetwork, caracteristicas: dict):
    torch.save(modelo.state_dict(), f'./modelos/{caracteristicas['ID']}.pth')
    with open('DiccionarioModelos.txt', 'a') as file:
        file.write(f'{caracteristicas}\n')

def cargar_modelo(caracteristicas: dict, gpu=True):
    modelo = FeedForwardNeuralNetwork(caracteristicas['Entradas'], caracteristicas['Arquitectura'], caracteristicas['Salidas'])
    modelo.load_state_dict(torch.load(f'./modelos/{caracteristicas['ID']}.pth', weights_only=True))
    if torch.cuda.is_available() and gpu:
        modelo.cuda()
    return modelo

<h1>Experimentación</h1>

In [None]:
filename = './dataset_polaridad_es.json'
epocas = 100
learning_rates = [0.1, 0.01, 0.001]
batch_sizes = [32, 16]
arquitectura_5_capas = [1024, 512, 256, 128, 64]
arquitectura_4_capas = [512, 256, 128, 64]
arquitectura_3_capas = [256, 128, 64]
arquitecturas = [arquitectura_5_capas, arquitectura_4_capas, arquitectura_3_capas]
variantes_esp = ['GEN', 'MX', 'ES']

<h2>TF-IDF</h2>

In [None]:
if EXPERIMENTAR:
    vec = 'TF-IDF'
    n_modelo = 1
    X, Y = leer_datos(filename, transform=True)
    X_train, X_test, X_val, Y_train, Y_test, Y_val = vectorizar_TF_IDF(X, Y)
    Xs, Ys = torchificar([X_train, X_test, X_val], [Y_train, Y_test, Y_val], vec, one_hot_encoding=True) # type: ignore

    for arq in arquitecturas:
        for lr in learning_rates:
            for b in batch_sizes:
                caracteristicas = {
                    'ID':n_modelo,
                    'Vectorizacion':vec,
                    'Variante':None,
                    'Entradas':Xs[0].shape[1],
                    'Arquitectura':arq,
                    'Salidas':NUM_CLASES,
                    'LearningRate':lr,
                    'BatchSize':b,
                    'CapasOcultas':len(arq)
                }
                modelo = FeedForwardNeuralNetwork(Xs[0].shape[1], arq, NUM_CLASES)
                funcion_perdida = nn.CrossEntropyLoss()
                optimizador = optim.Adam(modelo.parameters(), lr=lr)
                historial_perdida = entrenar(Xs, Ys, modelo, optimizador, funcion_perdida, epocas=epocas, batch_size=b)
                f1, a, p, r = evaluar(Xs[1], Ys[1], modelo, verbose=True)
                caracteristicas.update({
                    'Epocas':epocas,
                    'F1-score':f1,
                    'Accuracy':a,
                    'Precision':p,
                    'Recall':r,
                    'Historial':historial_perdida
                })
                guardar_modelo(modelo, caracteristicas)
                n_modelo += 1

<h2>Word Embeddings</h2>

In [None]:
if EXPERIMENTAR:
    vec = 'Embeddings'
    X, Y = leer_datos(filename)
    for v in variantes_esp:
        X_train, X_test, X_val, Y_train, Y_test, Y_val = vectorizar_embeddings(X, Y, v)
        Xs, Ys = torchificar([X_train, X_test, X_val], [Y_train, Y_test, Y_val], vec, one_hot_encoding=True) # type: ignore
        for arq in arquitecturas:
            for lr in learning_rates:
                for b in batch_sizes:
                    caracteristicas = {
                        'ID':n_modelo,
                        'Vectorizacion':vec,
                        'Variante':v,
                        'Entradas':Xs[0].shape[1],
                        'Arquitectura':arq,
                        'Salidas':NUM_CLASES,
                        'LearningRate':lr,
                        'BatchSize':b,
                        'CapasOcultas':len(arq),
                    }
                    modelo = FeedForwardNeuralNetwork(Xs[0].shape[1], arq, NUM_CLASES)
                    funcion_perdida = nn.CrossEntropyLoss()
                    optimizador = optim.Adam(modelo.parameters(), lr=lr)
                    historial_perdida = entrenar(Xs, Ys, modelo, optimizador, funcion_perdida, epocas=epocas, batch_size=b)
                    f1, a, p, r = evaluar(Xs[1], Ys[1], modelo, verbose=True)
                    caracteristicas.update({
                        'Epocas':epocas,
                        'F1-score':f1,
                        'Accuracy':a,
                        'Precision':p,
                        'Recall':r,
                        'Historial':historial_perdida
                    })
                    guardar_modelo(modelo, caracteristicas)
                    n_modelo += 1

<h2>Graficación y análisis</h2>

In [None]:
def leer_diccionarios(filename):
    file = open(filename, 'r')
    diccionarios = []
    for line in file.readlines():   
        diccionarios.append(eval(line))
    file.close()
    return diccionarios

def graficar_conjunto(listas_modelos, nombres_labels, titulo, filename, epocas, opacidad=0.2, error_max=2):
    """
    Grafica un conjunto de redes neuronales en la misma grafica
    """
    colores = ['black', 'darkred', 'forestgreen', 'darkviolet', 'darkorange', 'royalblue']
    marcadores = ['solid', (0, (3, 1, 1, 1)), 'dotted', (0, (5, 1)), 'dashed', (0, (5, 10)), (5, (10, 3)), (0, (3, 5, 1, 5))]
    i = 0
    offset = len(colores) - len(listas_modelos)
    patches = []
    plt.figure(figsize=(16, 9))
    plt.title(titulo)
    n = 0
    for lista in listas_modelos:    
        color = colores[i + offset]
        for diccionario in lista:
            if np.array(diccionario['Historial']).max() > error_max:
                n += 1
            else:
                plt.plot(np.arange(epocas), diccionario['Historial'][:epocas], color=color, linestyle=marcadores[i], linewidth=1, alpha=opacidad)
        patches.append(mpatches.Patch(color=color, label=nombres_labels[i]))
        i += 1
    plt.figtext(0.13, 0.03, f'*{n} modelos excluidos para mejorar la visualización')
    plt.legend(handles=patches)
    plt.xlabel('Época')
    plt.ylabel('Cross Entropy Loss')
    plt.grid(True)
    plt.savefig(filename)
    plt.close()

def graficar_metricas(datos, titulo, filename: str):
    nombres_metricas = ['F1-score', 'Accuracy', 'Precision', 'Recall']
    _, ax = plt.subplots(figsize=(16, 9))
    width = 0.2
    grupos = np.arange(len(datos[0]))
    i = 0
    for metrica_p in datos:
        offset = width * i
        rects = ax.bar(grupos + offset, metrica_p, width, label=nombres_metricas[i])
        ax.bar_label(rects, padding=3)
        i += 1
    ax.set_xticks(grupos + 3 * width / 2, ['TF-IDF', 'Embeddings'])
    ax.set_ylabel('Puntuación')
    ax.set_title(titulo)
    ax.legend()
    plt.savefig(filename)
    plt.close()
    
diccionarios = leer_diccionarios('DiccionarioModelos.txt')
modelos_TF_IDF = [d for d in diccionarios if d['Vectorizacion'] == 'TF-IDF']
modelos_embeddings = [d for d in diccionarios if d['Vectorizacion'] == 'Embeddings']
graficar_conjunto([modelos_TF_IDF, modelos_embeddings], 
                  ['TF-IDF', 'Word Embeddings'], 
                  'TF-IDF vs Word Embeddings', 
                  './comparacion.png', 
                  epocas, 
                  opacidad=0.6)

modelos_lr_1 = [d for d in diccionarios if d['Vectorizacion'] == 'Embeddings' and d['LearningRate'] == 0.1]
modelos_lr_01 = [d for d in diccionarios if d['Vectorizacion'] == 'Embeddings' and d['LearningRate'] == 0.01]
modelos_lr_001 = [d for d in diccionarios if d['Vectorizacion'] == 'Embeddings' and d['LearningRate'] == 0.001]
graficar_conjunto([modelos_lr_1, modelos_lr_01, modelos_lr_001], 
                  ['Learning Rate = 0.1', 'Learning Rate = 0.01', 'Learning Rate = 0.001'], 
                  'Learning Rates en word embeddings', 
                  './learningrates.png',
                  epocas,
                  opacidad=0.5)

modelos_arq_3 = [d for d in diccionarios if d['CapasOcultas'] == 3]
modelos_arq_4 = [d for d in diccionarios if d['CapasOcultas'] == 4]
modelos_arq_5 = [d for d in diccionarios if d['CapasOcultas'] == 5]
graficar_conjunto([modelos_arq_3, modelos_arq_4, modelos_arq_5], 
                  ['3 capas ocultas', '4 capas ocultas', '5 capas ocultas'],
                  'Arquitecturas word embeddings', 
                  './arquitecturas.png',
                  epocas,
                  opacidad=0.9)

# Promedios de métricas

metricas_TF_IDF = [(d['F1-score'], d['Accuracy'], d['Precision'], d['Recall']) for d in modelos_TF_IDF]
metricas_TF_IDF = np.array(list(metricas_TF_IDF))
avg_TF_IDF = metricas_TF_IDF.mean(axis=0)

metricas_embeddings = [(d['F1-score'], d['Accuracy'], d['Precision'], d['Recall']) for d in modelos_embeddings]
metricas_embeddings = np.array(list(metricas_embeddings))
avg_embeddings = metricas_embeddings.mean(axis=0)

avg = np.vstack([avg_TF_IDF, avg_embeddings])
graficar_metricas(avg.T, 'Métricas promedio', './metricas.png')