In [None]:
import numpy as np
import pandas as pd
from sklearn import model_selection
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.layers import Embedding
import seaborn as sns
sns.set_theme(style="darkgrid")

# Réseau de neurones récurrent LSTM bi-directionnel pour la classification de documents

 > ℹ️ Inspiré de :
 > - https://keras.io/examples/nlp/pretrained_word_embeddings/
 > - https://keras.io/examples/nlp/bidirectional_lstm_imdb/
 > - https://www.machinecurve.com/index.php/2020/02/18/how-to-use-k-fold-cross-validation-with-keras/

<div class="alert alert-block alert-info">

🥅 **Objectifs**

- Savoir utiliser `keras` pour faire de l'apprentissage supervisé à partir de documents avec des réseaux de neurones récurrents de type LSTM bi-directionnel
</div>

## 1. Chargement des données (corpus et plongements)

Nous allons ré-utiliser la procédure vue dans le TP précédent.

### 1.1. Corpus annoté

In [None]:
!mkdir data
!wget -P data https://git.unistra.fr/dbernhard/ftaa_data/-/raw/main/winemag-fr_train.csv

In [None]:
# Lecture du fichier CSV
wine_df = pd.read_csv("data/winemag-fr_train.csv", sep=",", dtype={'description': 'object',
                                           'price': 'float64',
                                           'province': 'category',
                                           'variety': 'object'})

In [None]:
# Liste des classes
class_names = sorted(wine_df.province.unique().categories.to_list())
print("Classes :", class_names)
print("Nombre d'exemplaires :", len(wine_df))

In [None]:
# On associe à chaque classe un identifiant unique
class_index = {class_names[i]:i for i in range(len(class_names))}
class_index

In [None]:
# On utilise uniquement la variété et les descriptions comme données d'entrée
X_train_variety = wine_df.variety.str.split('_')
X_train = X_train_variety.str.join(' ') + ' ' + wine_df.description
# Les noms des classes sont remplacées par leur identifiant (un entier positif)
y_train = wine_df.province.map(class_index)

In [None]:
X_train.head()

In [None]:
y_train.head()

In [None]:
def get_vectorizer(documents, max_voc_size=8000, max_seq_length=50, batch_size=128):
  vectorizer = TextVectorization(max_tokens=max_voc_size,
                                 output_sequence_length=max_seq_length)
  # Création du jeu de données à partir de X_train et constitution de lots de 128 instances
  text_ds = tf.data.Dataset.from_tensor_slices(documents).batch(batch_size)
  # Création du vocabulaire à partir des données d'entrée
  vectorizer.adapt(text_ds)
  return vectorizer

In [None]:
keras_vectorizer = get_vectorizer(X_train)

Vocabulaire obtenu :

In [None]:
voc = keras_vectorizer.get_vocabulary()
print(len(voc))

In [None]:
word_index = dict(zip(voc, range(len(voc))))

### 1.2. Plongements de mots pré-entraînés


In [None]:
!wget -P data https://git.unistra.fr/dbernhard/ftaa_data/-/raw/main/model_26.txt

In [None]:
def load_embeddings(embeddings_file):
  embeddings_index = {}
  with open(embeddings_file, 'r', encoding='utf8') as f:
      for line in f:
          word, coefs = line.split(maxsplit=1)
          coefs = np.fromstring(coefs, "f", sep=" ")
          embeddings_index[word] = coefs
  print(f'{len(embeddings_index)} vecteurs de mots ont été lus')
  return embeddings_index

In [None]:
# Chargement des plongements du fichier model_6.txt
m26_embeddings = load_embeddings('data/model_26.txt')

In [None]:
def get_embedding_matrix(vocabulary, embeddings_index, embedding_dim = 300):
  num_tokens = len(vocabulary)
  hits = 0
  misses = 0

  # Préparation de la matrice
  embedding_matrix = np.zeros((num_tokens, embedding_dim))
  for word, i in word_index.items():
      embedding_vector = embeddings_index.get(word)
      if embedding_vector is not None:
          embedding_matrix[i] = embedding_vector
          hits += 1
      else:
          misses += 1
  print(f'{hits} mots ont été trouvés dans les plongements pré-entraînés')
  print(f'{misses} sont absents')
  return embedding_matrix

In [None]:
# Construction de la matrice de plongements à partir du vocabulaire
m26_embedding_matrix = get_embedding_matrix(voc, m26_embeddings)

## 2. Construction et entraînement du modèle


Nous allons faire une validation croisée à 5 plis.

Le réseau de neurones comprendra deux couches de LSTM bi-directionnels

In [None]:
def get_biLSTM_model(voc_size, embedding_matrix, embedding_dim=300):
  # Création du modèle
  int_sequences_input = keras.Input(shape=(None,), dtype="int64")
  embedding_layer = Embedding(voc_size, embedding_dim, trainable=True,
      embeddings_initializer=keras.initializers.Constant(embedding_matrix),
  )

  embedded_sequences = embedding_layer(int_sequences_input)
  x = layers.Bidirectional(layers.LSTM(64, dropout=0.2, recurrent_dropout=0.2))(embedded_sequences)
  preds = layers.Dense(len(class_names), activation="softmax")(x)
  model = keras.Model(int_sequences_input, preds)
  return model

In [None]:
# Affichage de l'architecture du modèle
biLSTM_model = get_biLSTM_model(len(voc), m26_embedding_matrix)
biLSTM_model.summary()

In [None]:
# Fonction pour l'entraînement d'un modèle
def train_model(X, y, model_function, vectorizer,
                voc_size, embedding_matrix, embedding_dim=300, batch_size=128):
  # Listes utilisées pour sauvegarder les résultats obtenus à chaque pli
  acc_per_fold = []
  loss_per_fold = []
  histories = []
  folds = 5
  stratkfold = model_selection.StratifiedKFold(n_splits=folds, shuffle=True,
                                              random_state=12)
  fold_no = 1
  for train, test in stratkfold.split(X, y):
    m_function = globals()[model_function]
    model = m_function(voc_size, embedding_matrix, embedding_dim)

    print('------------------------------------------------------------------------')
    print(f'Entraînement pour le pli {fold_no} ...')
    fold_x_train = vectorizer(X.iloc[train].to_numpy()).numpy()
    fold_x_val = vectorizer(X.iloc[test].to_numpy()).numpy()
    fold_y_train = y.iloc[train].to_numpy()
    fold_y_val = y.iloc[test].to_numpy()

    # Compilation du modèle : permet de préciser la fonction de perte et l'optimiseur
    # loss=sparse_categorical_crossentropy : entropie croisée, dans le cas où les
    #  classes cibles sont indiquées sous forme d'entiers. Il s'agira de minimiser
    #  la perte pendant l'apprentissage
    # optimizer=rmsprop : l'optimiseur détermine la manière doit les poids seront
    #  mis à jour pendant l'apprentissage
    model.compile(
      loss="sparse_categorical_crossentropy", optimizer="rmsprop", metrics=["acc"]
    )
    # Entraînement
    history = model.fit(fold_x_train, fold_y_train, batch_size=batch_size,
                        epochs=10, validation_data=(fold_x_val, fold_y_val))
    histories.append(history)
    # Evaluation sur les données de validation
    scores = model.evaluate(fold_x_val, fold_y_val, verbose=0)
    print(f'Scores pour le pli {fold_no}: {model.metrics_names[0]} = {scores[0]:.2f};',
          f'{model.metrics_names[1]} = {scores[1]*100:.2f}%')
    acc_per_fold.append(scores[1] * 100)
    loss_per_fold.append(scores[0])
    fold_no = fold_no + 1

  # Affichage des scores moyens par pli
  print('---------------------------------------------------------------------')
  print('Scores par pli')
  for i in range(0, len(acc_per_fold)):
    print('---------------------------------------------------------------------')
    print(f'> Pli {i+1} - Loss: {loss_per_fold[i]:.2f}',
          f'- Accuracy: {acc_per_fold[i]:.2f}%')
  print('---------------------------------------------------------------------')
  print('Scores moyens pour tous les plis :')
  print(f'> Accuracy: {np.mean(acc_per_fold):.2f}',
        f'(+- {np.std(acc_per_fold):.2f})')
  print(f'> Loss: {np.mean(loss_per_fold):.2f}')
  print('---------------------------------------------------------------------')
  return histories

In [None]:
# Entraînement du modèle et récupération des résultats
biLSTM_histories = train_model(X_train, y_train, 'get_biLSTM_model',
                            keras_vectorizer, len(voc), m26_embedding_matrix)

Affichage des résultats sous forme graphique :

In [None]:
def plot_results(histories):
  accuracy_data = []
  loss_data = []
  for i, h in enumerate(histories):
    acc = h.history['acc']
    val_acc = h.history['val_acc']
    loss = h.history['loss']
    val_loss = h.history['val_loss']
    for j in range(len(acc)):
      accuracy_data.append([i+1, j+1, acc[j], 'Entraînement'])
      accuracy_data.append([i+1, j+1, val_acc[j], 'Validation'])
      loss_data.append([i+1, j+1, loss[j], 'Entraînement'])
      loss_data.append([i+1, j+1, val_loss[j], 'Validation'])

  acc_df = pd.DataFrame(accuracy_data,
                        columns=['Pli', 'Epoch', 'Accuracy', 'Données'])
  sns.relplot(data=acc_df, x='Epoch', y='Accuracy', hue='Pli', style='Données',
              kind='line')

  loss_df = pd.DataFrame(loss_data, columns=['Pli', 'Epoch', 'Perte', 'Données'])
  sns.relplot(data=loss_df, x='Epoch', y='Perte', hue='Pli', style='Données',
              kind='line')

In [None]:
plot_results(biLSTM_histories)

❓ [1] Que constatez-vous par rapport aux résultats obtenus précédemment pour ce jeu de données (tf-idf) ?

❓ [2] Remplacez la couche LSTM par une couche de type GRU. Que constatez-vous pour les résultats ?