In [1]:
!pip install -q transformers

In [None]:
import tensorflow_datasets as tfds
from transformers import TFBertForSequenceClassification
import tensorflow as tf
from transformers import BertTokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)
import numpy as np

import os
os.environ['TF_USE_LEGACY_KERAS'] = '1'

In [3]:
## Hyperparamètres

# Longueur maximal en entrée (512 au maximal pour Bert)
max_length = 512
#Taille d'échantillon
batch_size = 6
# Taux d'apprentissages possibles pour Adam 2e-5, 3e-5 et 5e-5
learning_rate = 2e-5
# nombre d'epoch, maximum 2 pour des raisons de temps d'execution et de calcul
number_of_epochs = 1
# Nombre de couches gelé (maximum 12, le nombre de couches dans Bert)
freeze_layers = 10
# Nombre couches supplémentaires
add_layers = 1

In [4]:
def convert_example_to_feature(review):
  return tokenizer.encode_plus(review,
                add_special_tokens = True, # ajout des tokens [CLS], [SEP]
                max_length = max_length, # Longueur maximal du texte fournit à Bert
                pad_to_max_length = True, # ajout du token [PAD]
                return_attention_mask = True, # Ajout du mask pour eviter la concentration sur le token [PAD]
              )

In [5]:
def map_example_to_dict(input_ids, attention_masks, token_type_ids, label):
  return {
      "input_ids": input_ids,
      "token_type_ids": token_type_ids,
      "attention_mask": attention_masks,
  }, label

def encode_examples(ds, limit=-1):
  """Prends en entrée un ensemble de données et l'encode de sorte à pouvoir être utilisé par Bert"""
  input_ids_list = []
  token_type_ids_list = []
  attention_mask_list = []
  label_list = []
  if (limit > 0):
      ds = ds.take(limit)
  for review, label in tfds.as_numpy(ds):
    bert_input = convert_example_to_feature(review.decode())
    input_ids_list.append(bert_input['input_ids'])
    token_type_ids_list.append(bert_input['token_type_ids'])
    attention_mask_list.append(bert_input['attention_mask'])
    label_list.append([label])
  return tf.data.Dataset.from_tensor_slices((input_ids_list, attention_mask_list, token_type_ids_list, label_list)).map(map_example_to_dict)

In [None]:
# Recuperation des données "imdb review" pour l'ensemble de test et de validation
(ds_train, ds_validation), ds_info = tfds.load('imdb_reviews',
          split = (tfds.Split.TRAIN, tfds.Split.TEST),
          as_supervised=True,
          with_info=True)

In [None]:
# Encodage de l'ensemble d'entrainement
ds_train_encoded = encode_examples(ds_train).shuffle(10000).batch(batch_size)

# Encodage de l'ensemble de validation
ds_validation_encoded = encode_examples(ds_validation).batch(batch_size)

In [None]:
# Recuperation des données pour l'ensemble de test 1 basé sur "yelp polarity review"
(ds_test_sentiment), ds_info = tfds.load('yelp_polarity_reviews',
          as_supervised=True,
          split = (tfds.Split.TEST),
          with_info=True)

# Encodage de l'ensemble de test 1
ds_test_sentiment_encoded = encode_examples(ds_test_sentiment).batch(batch_size)

In [None]:
# Recuperation des données pour l'ensemble de test 2 basé sur "Glue", attention, le split de validation est utilisé ici, celui de test de fonctionne pas et ne contient que des -1
ds_test_other = tfds.load(name="glue", split=(tfds.Split.VALIDATION))

def extract_sentence_label(example):
  """Prend en entrée un ensemble de donnée et en extrait les textes et les étiquettes"""
  return example['sentence'], example['label']

# Application de la fonction sur l'ensemble de test 2 pour le mettre au même format que les autres ensembles de données (suppression des indices des données)
ds_test_other = ds_test_other.map(extract_sentence_label)

# Encodage de l'ensemble de test 2
ds_test_other_encoded = encode_examples(ds_test_other).batch(batch_size)

In [None]:
# Importation du modèle préentrainé
model = TFBertForSequenceClassification.from_pretrained('bert-base-uncased')

# Gèle d'un nombre spécifique de couches
for i in range(freeze_layers):
  model.bert.encoder.layer[i].trainable = False

In [12]:
## Ajout de nouvelles couches à la suite du modèle
if add_layers != 0:

# Définition des entrées et du mask d'attention
  input_ids = tf.keras.layers.Input(shape=(max_length,), dtype=tf.int32, name="input_ids")
  attention_mask = tf.keras.layers.Input(shape=(max_length,), dtype=tf.int32, name="attention_mask")

# Récupération de la sortie de la dernière couche du modèle préentrainé de Bert
  outputs = model([input_ids, attention_mask])[0]

# Définition de la couche dense et du dropout de la première couche additionnelle
  dense_layer1 = tf.keras.layers.Dense(256, activation='relu')
  dropout_layer = tf.keras.layers.Dropout(0.1)
# Application de ces couches à la suite du modèle préentrainé
  x = dense_layer1(outputs)
  x = dropout_layer(x)



# Définition et ajout de même d'autant de couches supplémentaires que demandé
  for i in range(add_layers - 1) :
    dense_layer = tf.keras.layers.Dense(256, activation='relu')
    dropout_layer = tf.keras.layers.Dropout(0.1)
    x = dense_layer(x)
    x = dropout_layer(x)
# Ajout d'une couche de sortie à deux classes
  dense_layer_final = tf.keras.layers.Dense(2, activation='softmax')
  x = dense_layer_final(x)

# Création du modèle complet à partir des différentes couches
  model = tf.keras.Model(inputs=[input_ids, attention_mask], outputs=x)

In [13]:
# Initialisation d'un optimiseur Adam
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, epsilon=1e-08)

# Initialisation de la perte et de la prédiction, ici, nous n'avons pas de one-hot-vector
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')

#Compilation du modèle pour utilisation
model.compile(optimizer=optimizer, loss=loss, metrics=[metric])

In [None]:
# Entrainement du modèle et test de celui-ci sur l'ensemble d'évaluation
history_and_validation = model.fit(ds_train_encoded, epochs=number_of_epochs, validation_data=ds_validation_encoded)

In [None]:
# Test du modèle sur l'ensemble de test 1
results_1 = model.evaluate(ds_test_sentiment_encoded)

In [None]:
# Test du modèle sur l'ensemble de test 2
results_2 = model.evaluate(ds_test_other_encoded)