<a href="https://colab.research.google.com/github/mwauquier/LYSL005_machine_creativity/blob/main/LYSL005_2022_transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Générer du texte avec des transformers

Dans un premier temps, nous allons voir comment générer du texte avec des transformers prêts à l'emploi, sans réel intervention relative à l'entraînement des modèles.

## GPT-2

La famille des GPT (qui compte désormais plusieurs descendants) est une famille de transformers génératifs pré-entraînés.

In [None]:
!pip install transformers

In [None]:
# On importe les ressources nécessaires
from transformers import pipeline, set_seed 

In [None]:
# On définit les paramètres liés au modèle (modèle, tâche, reproductibilité)
generator = pipeline('text-generation', model='gpt2') 
set_seed(42) 

In [None]:
# On génère n séquences de j caractères maximum à partir d'un prompt donné
n = 5
j = 30
prompt = "Hello, I'm a language model" # I'm a prompt and
generator(prompt, max_length=j, num_return_sequences=n)

## BERT



Nous utiliserons un code adapté à partir du travail de Wang et Cho (2019), et qui exploite des modèles en anglais.

In [None]:
!pip3 install pytorch_pretrained_bert

In [None]:
# Charger les librairies
import numpy as np
import torch
from pytorch_pretrained_bert import BertTokenizer, BertModel, BertForMaskedLM

In [None]:
# Load English pre-trained model (weights)
model_version = 'bert-base-uncased'
model = BertForMaskedLM.from_pretrained(model_version)
model.eval()
cuda = torch.cuda.is_available()
if cuda:
    model = model.cuda()

# Load pre-trained model tokenizer (vocabulary)
tokenizer = BertTokenizer.from_pretrained(model_version, do_lower_case=model_version.endswith("uncased"))

def tokenize_batch(batch):
    return [tokenizer.convert_tokens_to_ids(sent) for sent in batch]

def untokenize_batch(batch):
    return [tokenizer.convert_ids_to_tokens(sent) for sent in batch]

def detokenize(sent):
    """ Roughly detokenizes (mainly undoes wordpiece) """
    new_sent = []
    for i, tok in enumerate(sent):
        if tok.startswith("##"):
            new_sent[len(new_sent) - 1] = new_sent[len(new_sent) - 1] + tok[2:]
        else:
            new_sent.append(tok)
    return new_sent

CLS = '[CLS]'
SEP = '[SEP]'
MASK = '[MASK]'
mask_id = tokenizer.convert_tokens_to_ids([MASK])[0]
sep_id = tokenizer.convert_tokens_to_ids([SEP])[0]
cls_id = tokenizer.convert_tokens_to_ids([CLS])[0]

Le système commence par masquer l'intégralité des mots de la phrase, et regénère arbitrairement un mot à la fois.

Plus précisément :
- Tous les mots sont masqués
- On retire aléatoirement un des masques et on génère un output à partir de la distribution des probabilités du modèle
- On répère cette opération
- Le système s'arrête quand il a convergé, ou quand le procédé dure trop longtemps.

In [None]:
def generate_step(out, gen_idx, temperature=None, top_k=0, sample=False, return_list=True):
    """ Generate a word from from out[gen_idx]
    
    args:
        - out (torch.Tensor): tensor of logits of size batch_size x seq_len x vocab_size
        - gen_idx (int): location for which to generate for
        - top_k (int): if >0, only sample from the top k most probable words
        - sample (Bool): if True, sample from full distribution. Overridden by top_k 
    """
    logits = out[:, gen_idx]
    if temperature is not None:
        logits = logits / temperature
    if top_k > 0:
        kth_vals, kth_idx = logits.topk(top_k, dim=-1)
        dist = torch.distributions.categorical.Categorical(logits=kth_vals)
        idx = kth_idx.gather(dim=1, index=dist.sample().unsqueeze(-1)).squeeze(-1)
    elif sample:
        dist = torch.distributions.categorical.Categorical(logits=logits)
        idx = dist.sample().squeeze(-1)
    else:
        idx = torch.argmax(logits, dim=-1)
    return idx.tolist() if return_list else idx
  
def get_init_text(seed_text, max_len, batch_size = 1, rand_init=False):
    """ Get initial sentence by padding seed_text with either masks or random words to max_len """
    batch = [seed_text + [MASK] * max_len + [SEP] for _ in range(batch_size)]
    #if rand_init:
    #    for ii in range(max_len):
    #        init_idx[seed_len+ii] = np.random.randint(0, len(tokenizer.vocab))
    
    return tokenize_batch(batch)

def printer(sent, should_detokenize=True):
    if should_detokenize:
        sent = detokenize(sent)[1:-1]
    print(" ".join(sent))

Ce code propose trois méthodes de génération : We consider three "modes" of generating:
- Génère un mot à la fois à une position aléatoire (*parallel sequential generation*)
- Génère tous les mots à la fois (*parallel generation*)
- Génère un mot à la fois, de gauche à droite (*sequential generation*)

Ces trois méthodes sont définies dans la cellule ci-dessous. Le choix de la méthode de génération se fait dans une fonction ultérieure.

In [None]:
# Generation modes as functions
import math
import time

def parallel_sequential_generation(seed_text, batch_size=10, max_len=15, top_k=0, temperature=None, max_iter=300, burnin=200,
                                   cuda=False, print_every=10, verbose=True):
    """ Generate for one random position at a timestep
    
    args:
        - burnin: during burn-in period, sample from full distribution; afterwards take argmax
    """
    seed_len = len(seed_text)
    batch = get_init_text(seed_text, max_len, batch_size)
    
    for ii in range(max_iter):
        kk = np.random.randint(0, max_len)
        for jj in range(batch_size):
            batch[jj][seed_len+kk] = mask_id
        inp = torch.tensor(batch).cuda() if cuda else torch.tensor(batch)
        out = model(inp)
        topk = top_k if (ii >= burnin) else 0
        idxs = generate_step(out, gen_idx=seed_len+kk, top_k=topk, temperature=temperature, sample=(ii < burnin))
        for jj in range(batch_size):
            batch[jj][seed_len+kk] = idxs[jj]
            
        if verbose and np.mod(ii+1, print_every) == 0:
            for_print = tokenizer.convert_ids_to_tokens(batch[0])
            for_print = for_print[:seed_len+kk+1] + ['(*)'] + for_print[seed_len+kk+1:]
            print("iter", ii+1, " ".join(for_print))
            
    return untokenize_batch(batch)

def parallel_generation(seed_text, batch_size=10, max_len=15, top_k=0, temperature=None, max_iter=300, sample=True, 
                        cuda=False, print_every=10, verbose=True):
    """ Generate for all positions at each time step """
    seed_len = len(seed_text)
    batch = get_init_text(seed_text, max_len, batch_size)
    
    for ii in range(max_iter):
        inp = torch.tensor(batch).cuda() if cuda else torch.tensor(batch)
        out = model(inp)
        for kk in range(max_len):
            idxs = generate_step(out, gen_idx=seed_len+kk, top_k=top_k, temperature=temperature, sample=sample)
            for jj in range(batch_size):
                batch[jj][seed_len+kk] = idxs[jj]
            
        if verbose and np.mod(ii, print_every) == 0:
            print("iter", ii+1, " ".join(tokenizer.convert_ids_to_tokens(batch[0])))
    
    return untokenize_batch(batch)
            
def sequential_generation(seed_text, batch_size=10, max_len=15, leed_out_len=15, 
                          top_k=0, temperature=None, sample=True, cuda=False):
    """ Generate one word at a time, in L->R order """
    seed_len = len(seed_text)
    batch = get_init_text(seed_text, max_len, batch_size)
    
    for ii in range(max_len):
        inp = [sent[:seed_len+ii+leed_out_len]+[sep_id] for sent in batch]
        inp = torch.tensor(batch).cuda() if cuda else torch.tensor(batch)
        out = model(inp)
        idxs = generate_step(out, gen_idx=seed_len+ii, top_k=top_k, temperature=temperature, sample=sample)
        for jj in range(batch_size):
            batch[jj][seed_len+ii] = idxs[jj]
        
    return untokenize_batch(batch)


def generate(n_samples, seed_text="[CLS]", batch_size=10, max_len=25, 
             generation_mode="parallel-sequential",
             sample=True, top_k=100, temperature=1.0, burnin=200, max_iter=500,
             cuda=False, print_every=1):
    # main generation function to call
    sentences = []
    n_batches = math.ceil(n_samples / batch_size)
    start_time = time.time()
    for batch_n in range(n_batches):
        if generation_mode == "parallel-sequential":
            batch = parallel_sequential_generation(seed_text, batch_size=batch_size, max_len=max_len, top_k=top_k,
                                                   temperature=temperature, burnin=burnin, max_iter=max_iter, 
                                                   cuda=cuda, verbose=False)
        elif generation_mode == "sequential":
            batch = sequential_generation(seed_text, batch_size=batch_size, max_len=max_len, top_k=top_k, 
                                          temperature=temperature, leed_out_len=leed_out_len, sample=sample,
                                          cuda=cuda)
        elif generation_mode == "parallel":
            batch = parallel_generation(seed_text, batch_size=batch_size,
                                        max_len=max_len, top_k=top_k, temperature=temperature, 
                                        sample=sample, max_iter=max_iter, 
                                        cuda=cuda, verbose=False)
        
        if (batch_n + 1) % print_every == 0:
            print("Finished batch %d in %.3fs" % (batch_n + 1, time.time() - start_time))
            start_time = time.time()
        
        sentences += batch
    return sentences

Avant de générer notre texte, il nous faut fixer divers paramètres :

- n_sample : le nombre de "phrases" à générer.
- max_len : la longueur de la séquence à générer
- top_k : la taille de l'échantillon des mots les plus probables dans lequel le système va piocher
- temperature : un paramètre de lissage de la distribution des mots suivants. Plus la valeur sera élevée, plus cette distribution sera uniforme (lissée). Plus elle sera faible, moins elle sera lissée.
- burnin : ce paramètre ne vaut que pour les méthodes de génération non séquentielle. Elle implique la neutralisation du paramètre top_k, en faisant piocher le système dans l'ensemble de la distribution, et non plus parmi l'échantillon des k mots les plus probables.
- max_iter : nombre d'itérations
- seed_text : Premier élément à partir duquel générer du texte. Il s'agit du prompt. Il contient le préfix [CLS], auquel vous pouvez ajouter du texte.

In [None]:
n_samples = 5
batch_size = 5
max_len = 40
top_k = 100
temperature = 1.0
generation_mode = "parallel-sequential"
leed_out_len = 5 # max_len
burnin = 250
sample = True
max_iter = 500
prompt = "[CLS]"

Il vous reste alors à mobiliser la fonction. En l'occurrence, les énoncés générés sont enregistrés dans la liste `bert_sents`, que l'on peut ensuite aller parcourir.

In [None]:
# Choose the prefix context
seed_text = prompt.split()
bert_sents = generate(n_samples, seed_text=seed_text, batch_size=batch_size, max_len=max_len,
                      generation_mode=generation_mode,
                      sample=sample, top_k=top_k, temperature=temperature, burnin=burnin, max_iter=max_iter,
                      cuda=cuda)

In [None]:
for sent in bert_sents:
    printer(sent, should_detokenize=True)

Vous pouvez si vous le souhaitez imprimer les énoncés générés directement dans un fichier et non dans le notebook.

In [None]:
def write_sents(out_file, sents, should_detokenize=False):
    with open(out_file, "w") as out_fh:
        for sent in sents:
            sent = detokenize(sent[1:-1]) if should_detokenize else sent
            out_fh.write("%s\n" % " ".join(sent))

for temp in [1.0]:
    bert_sents = generate(n_samples, seed_text=seed_text, batch_size=batch_size, max_len=max_len, generation_mode=generation_mode,
                          sample=sample, top_k=top_k, temperature=temp, burnin=burnin, max_iter=max_iter)
                          #, cuda=True)
    out_file = "/path/to/outfile/%s-len%d-burnin%d-topk%d-temp%.3f.txt" % (model_version, max_len, burnin, top_k, temp)
    write_sents(out_file, bert_sents, should_detokenize=True)

Si les modèles GPT2 sont entraînés sur des données issues du web sans contrainte de pays ou de langue (et donc contenant majoritairement de l'anglais, mais pas que), il est spécialisé sur l'anglais.

Vous pouvez utiliser des modèles GPT2 spécialement développés pour le français en cherchant dans la liste des modèles accessible à la page https://huggingface.co/models?sort=downloads . Il vous suffit de chercher dans la barre de recherche "GPT2 french". Attention, le nom à intégrer en argument doit intégrer le nom du répertoire s'il est indiqué dans le "nom" du modèle.

Voici par exemple des noms de modèles GPT2 pour le français : "dbddv01/gpt2-french-small", "ClassCat/gpt2-base-french".

Ils s'utilisent sinon comme n'importe quel autre modèle

In [None]:
# On importe les ressources nécessaires
from transformers import pipeline, set_seed 

In [None]:
# On définit les paramètres liés au modèle (modèle, tâche, reproductibilité)
generator = pipeline('text-generation', model='ClassCat/gpt2-base-french')
set_seed(42)  

In [None]:
# On génère n séquences de j caractères maximum à partir d'un prompt donné
n = 5
j = 30
prompt = "Bonjour, je suis un modèle de langue et" # I'm a prompt and
generator(prompt, max_length=j, num_return_sequences=n)

# Fine tuning pour la génération de texte

Notez que les modèles français précédemment présentés n'ont pas été entraînés à partir de rien (*from scratch*). Il s'agit en réalité de modèles qui ont été finetunés. Plus précisément, ce sont des modèles qui ont été adaptés de modèles initiaux (majoritairement anglais) pour prendre en compte des données en français. L'idée est de réentraîner le modèle (ou du moins de poursuivre son entraînement) en lui soumettant des données spécialisées (ici en français).

Nous allons voir comment finetuner des modèles en utilisant le code ci-dessous (adapté du tutoriel https://www.kaggle.com/code/changyeop/how-to-fine-tune-gpt-2-for-beginners )

In [None]:
# À lancer si vous n'avez pas préalablement initialisé l'environnement

!pip install transformers

In [None]:
# On importe les librairies nécessaires

from transformers import TextDataset, DataCollatorForLanguageModeling
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers import Trainer, TrainingArguments

In [None]:
# On définit les fonctions nécessaires (chargement des données, )

def load_dataset(file_path, tokenizer, block_size = 128):
    dataset = TextDataset(
        tokenizer = tokenizer,
        file_path = file_path,
        block_size = block_size,
    )
    return dataset


def load_data_collator(tokenizer, mlm = False):
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer, 
        mlm=mlm,
    )
    return data_collator


def train(train_file_path,model_name,
          output_dir,
          overwrite_output_dir,
          per_device_train_batch_size,
          num_train_epochs,
          save_steps):
  tokenizer = GPT2Tokenizer.from_pretrained(model_name)
  train_dataset = load_dataset(train_file_path, tokenizer)
  data_collator = load_data_collator(tokenizer)

  tokenizer.save_pretrained(output_dir)
      
  model = GPT2LMHeadModel.from_pretrained(model_name)

  model.save_pretrained(output_dir)

  training_args = TrainingArguments(
          output_dir=output_dir,
          overwrite_output_dir=overwrite_output_dir,
          per_device_train_batch_size=per_device_train_batch_size,
          num_train_epochs=num_train_epochs,
      )

  trainer = Trainer(
          model=model,
          args=training_args,
          data_collator=data_collator,
          train_dataset=train_dataset,
  )
      
  trainer.train()
  trainer.save_model()

In [None]:
# Nécessaire pour permettre au Notebook Colab d'accèder à votre Drive et donc à vos données

from google.colab import drive
drive.mount('/content/drive')

In [None]:
# On définit les paramètres (les données de réentraînement, l'endroit où seront stockés les fichiers relatifs à l'entraînement, les paramètres)
train_file_path = "/content/drive/MyDrive/machine_creativity/test_corpus_wiki_espaceSchegen_20230102.txt"
model_name = 'ClassCat/gpt2-base-french'
output_dir = '/content/drive/MyDrive/machine_creativity/result_finetuning_test_20230102'
overwrite_output_dir = False
per_device_train_batch_size = 10
num_train_epochs = 1.0
save_steps = 1000

In [None]:
# Ré-entraînement du modèle sur vos données
train(
    train_file_path=train_file_path,
    model_name=model_name,
    output_dir=output_dir,
    overwrite_output_dir=overwrite_output_dir,
    per_device_train_batch_size=per_device_train_batch_size,
    num_train_epochs=num_train_epochs,
    save_steps=save_steps
)

L'entraînement peut prendre du temps. À guise d'exemple, le finetuning du modèle "ClassCat/gpt2-base-french" à partir d'un article (Siward Barn), d'un poids de 20kB, avec les paramètres per_device_train_batch_size = 10,
num_train_epochs = 1.0, et save_steps = 1000, prend 4min. Ce même finetuning mais sur les 13 articles de la catégorie "Espace Schengen" de Wikipedia (Catégorie:Espace Schengen), pour un poids de 306kB, prendre 26min.

L'étape suivante est celle de la génération de texte à proprement parler. Cependant, on ne peut pas ici reprendre la fonction `generator()` précédemment utilisée puisqu'il nous faut ici redéfinir les modèles et tokeniser à utiliser. Nous devons donc redéfinir la fonction de génération de texte à l'aide du code ci-dessous.

In [None]:
from transformers import PreTrainedTokenizerFast, GPT2TokenizerFast

def load_model(model_path):
    model = GPT2LMHeadModel.from_pretrained(model_path)
    return model


def load_tokenizer(tokenizer_path):
    tokenizer = GPT2Tokenizer.from_pretrained(tokenizer_path)
    return tokenizer


def generate_text(sequence, max_length):
    model_path = "/content/drive/MyDrive/machine_creativity/result_finetuning_test_20230102"
    model = load_model(model_path)
    tokenizer = load_tokenizer(model_path)
    ids = tokenizer.encode(f'{sequence}', return_tensors='pt')
    final_outputs = model.generate(
        ids,
        do_sample=True,
        max_length=max_length,
        pad_token_id=model.config.eos_token_id,
        top_k=50,
        top_p=0.95,
    )
    print(tokenizer.decode(final_outputs[0], skip_special_tokens=True))

Il suffit alors d'utiliser la fonction pour générer du texte avec notre modèle finetuné.

In [None]:
prompt=""
max_len = 100
generate_text(prompt, max_len)