<center>
<p><img src="https://mcd.unison.mx/wp-content/themes/awaken/img/logo_mcd.png" width="150">
</p>



<h1>Curso Procesamiento de Lenguaje Natural</h1>

<h3>Redes siamesas con modelos preentrenados</h3>


<p> Julio Waissman Vilanova </p>
<p>
<img src="https://identidadbuho.unison.mx/wp-content/uploads/2019/06/letragrama-cmyk-72.jpg" width="150">
</p>


<a target="_blank" href="https://colab.research.google.com/github/mcd-unison/pln/blob/main/labs/RNN/siamesas.ipynb"><img src="https://i.ibb.co/2P3SLwK/colab.png"  style="padding-bottom:5px;"  width="30" /> Ejecuta en Colab</a>

Tomado parcialmente y adaptado de [esta entrada de medium](https://medium.com/swlh/few-shot-learning-in-nlp-use-siamese-networks-189de22459d0) 

</center>


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

np.random.seed(0)
plt.style.use("ggplot")

import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers

import tensorflow_hub as hub

print('Tensorflow version:', tf.__version__)
print('GPU detected:', tf.config.list_physical_devices('GPU'))

## Datos utilizados para el aprendizaje

Vamos a utilizar un conjunto de textos que pertenecesn a 5 clases deferentes provenientes de blogs para ilustrar el uso de redes siamesas y las medidas de similaridad.

Vamos a asumir que tenemos un conjunto pequeño de datos (100) pertenecientes a 5 clases diferentes, y con estos datos aprender una medida de similaridad que nos permita clasificar correctamente un grupo mucho más amplio de textos, utilizando el concepto de redes siamesas y de entrenamiento por tripletas.

Veamos los datos:


In [None]:
train_url = "https://raw.githubusercontent.com/koushikkonwar/Few-Shot-/master/Dataset/final_fewshot_train.csv"
test_url = "https://raw.githubusercontent.com/koushikkonwar/Few-Shot-/master/Dataset/final_fewshot_test.csv"

train_df = pd.read_csv(train_url) 
test_df = pd.read_csv(test_url)

print("Info del conjunto de entrenamiento:")
print(train_df.info())

print("\nInfo del conjunto de prueba:")
print(test_df.info())


print("\nClases del conjunto de entrenamiento:")
print(train_df.value_counts("class"))

for i in range(5):
  print(f"\nTextos de la clase {i + 1}:\n")
  print(train_df[train_df["class"] == i + 1].head(5))

## Procesamiento de la información

Vamos a convertir una secuencia de tokens en inglés en una secuencia de *embeddings* utilizando un modelo preentrenado conocido como [Universal Sentence Encoder]() y el cual lo podemos consultar en su [enlace en TensorFlow Hub](https://tfhub.dev/google/universal-sentence-encoder/4). Este es un modelo ligero y previo a *ELMo*, *BERT* y demás fauna.

In [None]:
module_url = 'https://tfhub.dev/google/universal-sentence-encoder-large/4'
embed = hub.load(module_url)

In [None]:
print("Ejemplo de uso del USE")
print(f"Sentencia: {train_df['text'].values[5]}")
print(f"Embedding: {embed(train_df['text'].values[5:6])}")

## Modelo de vector de similaridad

Vamos a utilizar un modelo muy simple, es simplemente para agregar capas entrenables no recurrentes, pero si se decide se podrían modificar inclusive las capas del modelo original haciendo *fine-tunning*.

In [None]:
input_text = keras.Input(shape=(512,))

x = layers.Dense(256, activation='relu', name='d_1')(input_text)
x = layers.Dropout(0.4)(x)
x = layers.BatchNormalization(name='norm_1')(x)
x = layers.Dense(
        64, activation='relu', name='d_2',
        kernel_regularizer=keras.regularizers.l2(0.001)
    )(x)
x = layers.Dropout(0.4)(x)

y = layers.Dense(128, name='d_3')(x)


norm_layer = layers.Lambda(
        lambda  x: keras.backend.l2_normalize(x, axis=1), 
        name='norm_2'
    )(y)

model=keras.Model(inputs=[input_text], outputs=norm_layer)

model.summary()

## Modelo para el aprendizaje y función de pérdida

El modelo de aprendizaje con tripletas utiliza el modelo original 3 veces y cuando modifica sus pesos lo hace en forma global. Igualmente, la función de pérdida utilizada es particular, como lo vimos en clase, por lo que vamos a utilizar un modelo específico para aprender, pero derivado del modelo original. Cuando aprendamos en el modelo de aprendizaje, estamos ajustando automáticamente los pesos en nuestro modelo original.

Iniciemos por establecer el modelo de aprendizaje:

In [None]:
# Definimos una capa de tripletas para la función de pérdida
class TripletLossLayer(layers.Layer):
    def __init__(self, alpha, **kwargs):
        self.alpha = alpha
        super(TripletLossLayer, self).__init__(**kwargs)
    
    def triplet_loss(self, inputs):
        a, p, n = inputs
        p_dist =  keras.losses.cosine_similarity(a, p, axis=-1)
        n_dist = keras.losses.cosine_similarity(a, n, axis=-1)
        return keras.backend.sum(
            keras.backend.maximum(p_dist - n_dist + self.alpha, 0), 
            axis=0
        )
    
    def call(self, inputs):
        loss = self.triplet_loss(inputs)
        self.add_loss(loss)
        return loss


# Entradas para anchor, pos y neg
in_a = keras.Input(shape=(512,), name="in_a")
in_p = keras.Input(shape=(512,), name="in_p")
in_n = keras.Input(shape=(512,), name="in_n")

# Se aplica el modelo a las 3 entradas (red siamesa)
emb_a = model(in_a)
emb_p = model(in_p)
emb_n = model(in_n)

# Se agrega la capa de pérdida por tripletas
triplet_loss_layer = TripletLossLayer(
    alpha=0.4, 
    name='triplet_loss_layer'
)([emb_a, emb_p, emb_n])


model_train = keras.Model([in_a, in_p, in_n], triplet_loss_layer)
     
model_train.summary()

## Haciendo un generador de secuencias para el aprendizaje

Para acelerar el aprendizaje, vamos a realizar un generador de secuencias, aunque vamos a guardar las secuencias en memoria al no ser demasiadas, para que podamos trabajar con GPUs, paralelizando el aprendizaje. Así que le vamos a cargar toda la mano en el constructor de la clase, que al tener dos ciclos anidados muy feos, va a tardar un buen rato.

In [None]:
class TripletsBatches(keras.utils.Sequence):
    def __init__(self, batch_size, batch_num, df, embed):
        self.batch_size = batch_size
        self.batch_num = batch_num

        labels = df['class'].unique()

        y = df['class'].values
        idxs_clase = {
              label: np.flatnonzero(y == label) 
              for label in labels
        }

        self.a = []
        self.p = []
        self.n = []
        
        for _ in range(batch_num):
          id_a, id_p, id_n = [], [], []
          for _ in range(batch_size):
            e_pos, e_neg = np.random.choice(labels, 2, replace=False)
            ai, pi = np.random.choice(idxs_clase[e_pos], 2, replace=False)
            ni = np.random.choice(idxs_clase[e_neg])
            id_a.append(ai)
            id_p.append(pi)
            id_n.append(ni)
        
          self.a.append(embed(df.text.values[id_a]))
          self.p.append(embed(df.text.values[id_p]))
          self.n.append(embed(df.text.values[id_n]))


    def __len__(self):
        return self.batch_num

    def __getitem__(self, idx):
      return [self.a[idx], self.p[idx], self.n[idx]], []

In [None]:
batch_size = 128
batch_num = 10

secuencias_aprendizaje = TripletsBatches(batch_size, batch_num, train_df, embed)

## Entrenamiento

Esto ya es de rutina...

In [None]:
model_train.compile(
    loss=None, 
    optimizer='adam'
)

history = model_train.fit(
    secuencias_aprendizaje, 
    epochs=50
)

In [None]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')

plt.plot(history.history['loss'], '-')
plt.title('Pérdida durante el aprendizaje')
plt.ylabel('pérdida')
plt.xlabel('epoch')
plt.show()

## Probando el modelo para encontrar similaridades en un conjunto grande de datos

Esta parte puede tardar un poco en ejecución, así que hay que tener calma. 

Vamos a hacer lo siguiente:

1. Los textos de entrenamiento y prueba los vamos a convertir a embeddings, y a partir de ahí, a un vector de 128 dimensiones con nuestro modelo entrenado con la red siamesa.
2. Vamos a usar los datos de entrenamiento como base en un clasificador por vecinos próximos
3. Vamos a probar si encuentra una clasificación decente en un conjunto mucho más grande de datos de prueba (comparando las distancias coseno con los datos originales)

Bueno, por partes, como decía Jack *el destripador*

In [None]:
# Obteniendo los vectores y clases correspondientes 
# de los 100 datitos de entrenamiento

X_train = model.predict(embed(train_df['text'].values)['outputs'])
y_train = train_df['class'].values

In [None]:
# Obteniendo los vectores y clases correspondientes 
# del conjunto de prueba

X_test = model.predict(embed(test_df['text'].values)['outputs'])
y_test = test_df['class'].values

In [None]:
# Ahora vamos a ver si el vecino más próximo
# le corresponde la misma clase

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report

knn = KNeighborsClassifier(n_neighbors=1, metric='cosine')
knn.fit(X_train, y_train)

y_pred = knn.predict(X_test)

print(classification_report(y_test, y_pred))

¿Será? Vamos a tratar de ver que pasa y quienes son los vecinos de un texto:

In [None]:
ind = np.random.randint(0, X_test.shape[0])

x = X_test[ind:ind+1,:]

print(f"Texto: \n{test_df.text.values[ind]}\n")

dist, indices = knn.kneighbors(X=x, n_neighbors=5, return_distance=True)

vecinos = pd.DataFrame({
    'Texto': test_df.text.values[indices][0].tolist(),
    'Distancia': dist[0].tolist()
})

vecinos