## Clasificación de Textos

<img src="figs/fig-diagrama-clasificador.png" width="900">

# Entrenar al clasificador

### Clasificador: Red Neuronal Multicapa

<center>
<img src="figs/fig-MLP_XOR.png" width="600" style="background-color:white;">
</center>


### 1. Cargar los datos

In [None]:
import pandas as pd
dataset = pd.read_json("./data/data_aggressiveness_es.json", lines=True)
#conteo de clases
print("Total de ejemplos de entrenamiento")
print(dataset.klass.value_counts())
# Extracción de los textos en arreglos de numpy
X = dataset['text'].to_numpy()
# Extracción de las etiquetas o clases de entrenamiento
Y = dataset['klass'].to_numpy()

### 2. Codificar las categorías (clases)

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
# Normalizar las etiquetas a una codificación ordinal para entrada del clasificador
Y_encoded= le.fit_transform(Y)
print("Clases:")
print(le.classes_)
print("Clases codificadas:")
print(le.transform(le.classes_))

### 3. Preparar los conjuntos de datos  (datasets) para entrenamiento y para probar el rendimiento del clasificador

In [13]:
# Dividir el conjunto de datos en conjunto de entrenamiento (80%) y conjunto de pruebas (20%)
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test =  train_test_split(X, Y_encoded, test_size=0.2, stratify=Y_encoded, random_state=42)


### 4. Crear Matriz Documento-Término

In [None]:
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk import word_tokenize

_STOPWORDS = stopwords.words("spanish")  # agregar más palabras a esta lista si es necesario

# Normalización del texto

import unicodedata
import re
PUNCTUACTION = ";:,.\\-\"'/"
SYMBOLS = "()[]¿?¡!{}~<>|"
NUMBERS= "0123456789"
SKIP_SYMBOLS = set(PUNCTUACTION + SYMBOLS)
SKIP_SYMBOLS_AND_SPACES = set(PUNCTUACTION + SYMBOLS + '\t\n\r ')

def normaliza_texto(input_str,
                    punct=False,
                    accents=False,
                    num=False,
                    max_dup=2):
    """
        punct=False (elimina la puntuación, True deja intacta la puntuación)
        accents=False (elimina los acentos, True deja intactos los acentos)
        num= False (elimina los números, True deja intactos los acentos)
        max_dup=2 (número máximo de símbolos duplicados de forma consecutiva, rrrrr => rr)
    """
    
    nfkd_f = unicodedata.normalize('NFKD', input_str)
    n_str = []
    c_prev = ''
    cc_prev = 0
    for c in nfkd_f:
        if not num:
            if c in NUMBERS:
                continue
        if not punct:
            if c in SKIP_SYMBOLS:
                continue
        if not accents and unicodedata.combining(c):
            continue
        if c_prev == c:
            cc_prev += 1
            if cc_prev >= max_dup:
                continue
        else:
            cc_prev = 0
        n_str.append(c)
        c_prev = c
    texto = unicodedata.normalize('NFKD', "".join(n_str))
    texto = re.sub(r'(\s)+', r' ', texto.strip(), flags=re.IGNORECASE)
    return texto


# Preprocesamiento personalizado 
def mi_preprocesamiento(texto):
    #convierte a minúsculas el texto antes de normalizar
    tokens = word_tokenize(texto.lower())
    texto = " ".join(tokens)
    texto = normaliza_texto(texto)
    return texto
    
# Tokenizador personalizado 
def mi_tokenizador(texto):
    # Elimina stopwords: palabras que no se consideran de contenido y que no agregan valor semántico al texto
    #print("antes: ", texto)
    texto = [t for t in texto.split() if t not in _STOPWORDS]
    #print("después:",texto)
    return texto


vec_tfidf = TfidfVectorizer(analyzer="word", preprocessor=mi_preprocesamiento, tokenizer=mi_tokenizador,  ngram_range=(1,1))
X_tfidf = vec_tfidf.fit_transform(X_train)
print("vocabulario: ", len(vec_tfidf.get_feature_names_out()))

### 5. Crear el clasificador: Crear la clase MLP_TODO 

In [15]:
import numpy as np

# Función de activación sigmoide
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Derivada de la sigmoide
def sigmoid_derivative(x):
    # return sigmoid(x) * (1 - sigmoid(x))
    return x * (1 - x)

# Establece la semilla para la generación de números aleatorios
def seed(random_state=33):
    np.random.seed(random_state)

def xavier_initialization(input_size, output_size):
    # Calcular el límite de la distribución uniforme
    limit = np.sqrt(6 / (input_size + output_size))
    # Generar la matriz de pesos con distribución uniforme en el rango [-limit, limit]
    W = np.random.uniform(-limit, limit, (input_size, output_size))
    return W


def create_minibatches(X, y, batch_size):
    n_samples = X.shape[0]
    indices = np.random.permutation(n_samples)  # Mezcla los índices aleatoriamente
    X_shuffled, y_shuffled = X[indices], y[indices]  # Reordena X e y según los índices aleatorios
    
    # Divide los datos en minibatches
    for X_batch, y_batch in zip(np.array_split(X_shuffled, np.ceil(n_samples / batch_size)), 
                                np.array_split(y_shuffled, np.ceil(n_samples / batch_size))):
        yield X_batch, y_batch

    
class MLP_TODO:
    def __init__(self, num_entradas, num_neuronas_ocultas, num_salidas, epochs, batch_size=32, learning_rate=0.5, random_state=42):

        seed(random_state)
        # Definir la tasa de aprendizaje
        self.learning_rate = learning_rate
        # Definir el número de épocas
        self.epochs = epochs
        # Definir el tamaño del batch de procesamiento
        self.batch_size = batch_size
        
        # definir las capas
        self.W1 = xavier_initialization(num_neuronas_ocultas, num_entradas)  # Pesos entre capa de entrada y capa oculta
        self.b1 = np.zeros((1, num_neuronas_ocultas))   # Bias de la capa oculta
        self.W2 = xavier_initialization(num_salidas, num_neuronas_ocultas)  # Pesos entre capa oculta y capa de salida
        self.b2 = np.zeros((1, num_salidas)) # Bias de la capa de salida

    def forward(self, X):
        # TODO: implementar el forward pass
        #----------------------------------------------
        # 1. Propagación hacia adelante (Forward pass)
        #----------------------------------------------
        # TODO: Calcular la suma ponderada Z (z_c1) para la capa oculta 
        self.X = X
        self.z_c1 = 0
        # TODO: Calcular la activación de la capa oculta usando la función sigmoide
        self.a_c1 = 0
        # TODO: Calcular la suma ponderada Z (z_c2)  para la capa de salida 
        self.z_c2 = 0
        # TODO: Calcular la activación de la capa de salida usando la función sigmoide
        y_pred = 0  # Activación capa de salida
        return y_pred
    

    def loss_function_MSE(self, y_pred, y):
        #----------------------------------------------
        # 2. Cálculo del error con MSE
        #----------------------------------------------
        # TODO: Calcular el error cuadrático medio (MSE)
        self.y_pred = y_pred
        self.y = y
        error = 0.5 * np.mean((y_pred - y) ** 2)
        return error
    

    def backward(self):
        # TODO: implementar el backward pass
        # calcular los gradientes para la arquitectura de la figura anterior
        #----------------------------------------------
        # 3. Propagación hacia atrás (Backward pass)
        #----------------------------------------------
        
        #----------------------------------------------
        # Gradiente de la salida
        #----------------------------------------------
        # TODO: Calcular la derivada del error con respecto a la salida y
        dE_dy_pred = 0 # Derivada del error respecto a la predicción con  N ejemplos
        # TODO: Calcular la derivada de la activación de la salida con respecto a z_c2 
        d_y_pred_d_zc2 = 0
        # TODO: Calcular delta de la capa de salida
        delta_c2 = 0

        #----------------------------------------------
        # Gradiente en la capa oculta
        #----------------------------------------------
        # calcular la derivada de las suma ponderada respecto a las activaciones de la capa 1
        d_zc2_d_a_c1 = 0
        # TODO: Propagar el error hacia la capa oculta, calcular deltas de la capa 1
        delta_c1 = 0

        #calcula el gradiente de la función de error respecto a los pesos de la capa 2
        self.dE_dW2 = 0
        self.dE_db2 =0
        self.dE_dW1 =  0
        self.dE_db1 =  0


    def update(self):  # Ejecución de la actualización de paramámetros
        # TODO: implementar la actualización de los pesos y el bias
        #----------------------------------------------
        # Actualización de pesos de la capa de salida
        #---------------------------------------------- 
        
        # TODO: Actualizar los pesos y bias de la capa de salida
        self.W2 = 0
        self.b2 = 0

        #----------------------------------------------
        # Actuailzación de pesos de la capa oculta
        #----------------------------------------------
        #calcula el gradiente de la función de error respecto a los pesos de la capa 1
        self.W1 = 0
        self.b1 = 0

    def predict(self, X):  # Predecir la categoría para datos nuevos
        # TODO: implementar la predicción 
        y_pred = self.forward(X)
        # Obtener la clase para el clasificador binario
        y_pred = np.where(y_pred >= 0.5, 1, 0)
        return y_pred

    def train(self, X, Y):
        #implementar el entrenamiento de la red
        for epoch in range(self.epochs):
            for X_batch, y_batch in create_minibatches(X, Y, self.batch_size):
                y_pred = self.forward(X_batch)
                error = self.loss_function_MSE(y_pred, y_batch)
                self.backward() # cálculo de los gradientes
                self.update() # actualización de los pesos y bias

                # Imprimir el error cada N épocas
                if epoch % 100 == 0:
                    print(f"Época {epoch}, Error: {error}")

### 6. Entrenar la red neuronal

In [None]:
import numpy as np

X_tr = X_tfidf.toarray()
Y_tr = Y_train[:, np.newaxis] # Agregar una dimensión adicional para representar 1 ejemplo de entrenamiento por fila

num_entradas= X_tr.shape[1] # tamaño de la matriz Documento-Término
num_neuronas_ocultas = 128
num_salidas = 1
epochs = 110 
batch_size = 32
learning_rate = 0.5
random_state = 33

clasificador_mlp = MLP_TODO(num_entradas, num_neuronas_ocultas, num_salidas, epochs, batch_size, learning_rate, random_state)

# Entrenamos al clasificador
clasificador_mlp.train(X_tr, Y_tr)



### Predicción de datos nuevos

In [None]:
ejemplos_nuevos = ["Que se puede esperar de este perro ladrón"]
# Suponer que se cuenta con el objeto vec_tfidf entrenado con el vocabulario del conjunto de entrenamiento
X_ejemplos_tfidf = vec_tfidf.transform(ejemplos_nuevos)
X_ejemplos_tfidf = X_ejemplos_tfidf.toarray()
print(X_ejemplos_tfidf)

y_pred_nuevo = clasificador_mlp.predict(X_ejemplos_tfidf)
y_pred_nuevo = y_pred_nuevo.flatten()
print(le.inverse_transform(y_pred_nuevo))


### 7. Predecir los datos del conjuntos de prueba con el modelo entrenado

In [None]:
import numpy as np


X_test_tfidf = vec_tfidf.transform(X_test)
X_t = X_test_tfidf.toarray()
Y_t = Y_test[:, np.newaxis] # Agregar una dimensión adicional para representar 1 ejemplo de entrenamiento por fila

y_pred_test = clasificador_mlp.predict(X_t)
print(y_pred_test)

### Inspección de los resultados de los primeros N ejemplos de prueba

In [None]:
print("textos: ", X_test[:5])
print("clase esperada: ",Y_test[:5])
print("clase predicha: ", y_pred_test[:5])

### Mostrar la predicción de la clase original 

In [None]:
print(Y_test[:10])
print(y_pred_test[:10])

In [None]:

# llevar a la misma forma la salida de las predicciones
y_pred_test = y_pred_test.flatten()

print(Y_test[:10])
print(y_pred_test[:10])

In [None]:
# Obten las primeras N predicciones
pred =  y_pred_test[:5] 
pred_ori = le.inverse_transform(pred)
pred, pred_ori

# 8. Evaluando el desempeño

## Métricas de Evaluación
 - #### Las métricas precisión, recall y F1 son fundamentales para evaluar el rendimiento de un clasificador


<img src="figs/fig_precision-recall.png" width="300">

##### Fuente: https://en.wikipedia.org/wiki/Precision_and_recall


<img src="figs/fig_matriz-confusion.png" width="500">


TP=True Positive

TN=True Negative

FP=False Positive (Error tipo I: ejemplo, se considera que el paciente está enfermo, pero en realidad está sano)

FN=False Negative ( Error tipo II: ejemplo, se considera que el paciente está sano, pero en realidad está enfermo)


$$ Accuracy = \frac{total~ TP + total~TN}{total~muestras} $$

$$ Precision_c = \frac{ TP_c}{TP_c + FP_c} $$

$$ Recall_c = \frac{ TP_c}{TP_c + FN_c} $$

$$ F1-score_c= 2 \times \frac{ Precision_c \times Recall_c}{Precision_c + Recall_c} $$

$$ macro-F1-score= \frac{ 1 }{|Clases|} \sum{F1-score_c} $$

## Matriz de confusión

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
ConfusionMatrixDisplay.from_predictions(Y_test, y_pred_test)

In [None]:
# para la clase 0, la precisión es la siguiente
tp= 82
fp = 31+11+33
tp/(tp + fp)

## Métricas

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
print("P=", precision_score(Y_test, y_pred_test, average='macro'))
print("R=", recall_score(Y_test, y_pred_test, average='macro'))
print("F1=", f1_score(Y_test, y_pred_test, average='macro'))
print("Acc=", accuracy_score(Y_test, y_pred_test))


## Inspección del desempeño por clase

In [None]:
from sklearn.metrics import confusion_matrix
print(confusion_matrix(Y_test, y_pred_test))

In [None]:
from sklearn.metrics import classification_report
print(classification_report(Y_test, y_pred_test, digits=4, zero_division='warn'))