# Architectures génératives à base de token

Ce TP vise à utiliser directement des LSTM et à travailler sur des token pour traiter des données textuelles avec des approches de l'état de l'art.

Une partie des codes nécessaires sont disponibles dans les fichiers annexes `textloader.py` et `generate.py`

In [1]:
# import standard + 
# 
import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader
from textloader import *
from generate import *


import logging
import time
from itertools import chain
from pathlib import Path
logging.basicConfig(level=logging.INFO)
from tqdm import tqdm
import os

In [2]:

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
# device = "cpu"
print(device)

mps


In [3]:
from pathlib import Path
from IPython.display import display, HTML
from torch.utils.tensorboard import SummaryWriter

# Chemin vers TensorBoard
TB_PATH = "/tmp/logs/module-RNN"

# TENSORBOARD 
# usage externe de tensorboard: (1) lancer la commande dans une console; (2) copier-coller l'URL dans un navigateur
display(HTML("<h2>Informations</h2><div>Pour visualiser les logs, tapez la commande : </div>"))
print(f"tensorboard --logdir {Path(TB_PATH).absolute()}")
print("Une fois la commande lancer dans la console, copier-coller l'URL dans votre navigateur")

tensorboard --logdir /tmp/logs/module-RNN
Une fois la commande lancer dans la console, copier-coller l'URL dans votre navigateur


## A. Chargement des données

Tout le code est fourni


In [4]:
# paramétrisation

DATA_PATH = "./data/trump_full_speech.txt"

# OPERATION préliminaire: ouvrir le fichier pour comprendre la nature brute des données
alltxts = Path(DATA_PATH).read_text()
print(alltxts[:500]) # 100 premiers chars

Trump: Wow. Whoa. That is some group of people. Thousands. So nice, thank you very much. That's really nice. Thank you. It's great to be at Trump Tower. It's great to be in a wonderful city, New York. And it's an honor to have everybody here. This is beyond anybody's expectations. There's been no crowd like this. And, I can tell, some of the candidates, they went in. They didn't know the air-conditioner didn't work. They sweated like dogs. [laughter] They didn't know the room was too big, becaus


## Construction du vocabulaire

On introduit un *tokenizer* pour découper le texte en groupe de lettre. Ce découpage est non trivial (cf cours), on utilise un composant d'analyse venant des libs huggingface

In [5]:
from tokenizers import Tokenizer
from tokenizers.models import WordPiece
from tokenizers.trainers import WordPieceTrainer
from tokenizers.pre_tokenizers import Whitespace

# Initialiser un tokenizer WordPiece
tokenizer = Tokenizer(WordPiece(unk_token="[UNK]"))

# Définir les pré-traitements
tokenizer.pre_tokenizer = Whitespace()

# Créer un entraîneur avec un vocabulaire cible
trainer = WordPieceTrainer(
    vocab_size=10000,  # Limite du vocabulaire
    special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]
)

In [6]:
tokenizer.train([DATA_PATH], trainer)






In [7]:
# si on veut sauver/charger ces objets qui seront complètement liés aux modèles

# # Sauvegarder le tokenizer entraîné
# tokenizer.save("custom_tokenizer.json")

# # Charger et utiliser le tokenizer
# from tokenizers import Tokenizer
# tokenizer = Tokenizer.from_file("custom_tokenizer.json")


Exemple important pour comprendre le principe de la tokenization et les inégalités entre langues, les potentiels problèmes avec du vocabulaire technique...

In [8]:
# Exemple de tokenisation
output = tokenizer.encode("This is an example.")
print("Tokens:", output.tokens)
print("Token IDs:", output.ids)

# Autre exemple de tokenisation avec des mots plus rares
output = tokenizer.encode("Exemple en francais, loin de Trump")
print("Tokens:", output.tokens)
print("Token IDs:", output.ids)


Tokens: ['This', 'is', 'an', 'example', '.']
Token IDs: [504, 216, 279, 2015, 15]
Tokens: ['Ex', '##emp', '##le', 'en', 'fr', '##anc', '##ais', ',', 'lo', '##in', 'de', 'Trump']
Token IDs: [3064, 1573, 192, 357, 307, 1743, 8495, 13, 311, 167, 257, 457]


In [9]:
tokenizer.get_vocab_size()

10000

Récupération des indices de tokens spéciaux

In [10]:
PAD = tokenizer.encode("[PAD]").ids[0]
print("PAD",PAD)

EOS = tokenizer.encode(".").ids[0]
print("EOS", EOS)

PAD 3
EOS 15


### Chaine de traitement

1. Récupération des données avec un Dataset
2. Construction du dataloader

On se place dans une logique générative: on ne fait pas d'ensemble de test (c'est discutable)


In [11]:
from torch.utils.data import Dataset, DataLoader
import sys
import re

class TextDataset(Dataset):
    def __init__(self, text: str, tokenizer, maxsent=None, maxlen=None):
        """  Dataset pour les tweets de Trump
            * fname : nom du fichier
            * tokenizer : tokenizer (text => [int])
            * maxsent : nombre maximum de phrases.
            * maxlen : longueur maximale des phrases.
        """
        self.tokenizer = tokenizer
        maxlen = maxlen or sys.maxsize
        self.phrases = [re.sub(' +',' ',p[:maxlen]).strip() +"." for p in text.split(".") if len(re.sub(' +',' ',p[:maxlen]).strip())>0]
        if maxsent is not None:
            self.phrases=self.phrases[:maxsent]
        self.maxlen = max([len(p) for p in self.phrases])
        self.phrasesnum = [torch.tensor(tokenizer.encode(p).ids) for p in self.phrases]

    def __len__(self):
        return len(self.phrases)

    def __getitem__(self, i):
        # return torch.tensor(tokenizer.encode(self.phrases[i])) # si on veut faire la transf à la volée
        return self.phrasesnum[i]



In [12]:
ds = TextDataset(alltxts, tokenizer)

In [13]:

print("longueur:", len(ds))
print("doc 0: ", ds[0])
print("doc 1: ", ds[1])
print("doc 2: ", ds[2])

# savez-vous retrouver les token EOS/.?

longueur: 17051
doc 0:  tensor([ 457,   27, 3584,   15])
doc 1:  tensor([9290,   15])
doc 2:  tensor([ 508,  216,  627, 1450,  185,  270,   15])


In [14]:
# decoder une chaine
tokenizer.decode(ds[0].tolist())

# la fonction est jolie et enlève les éventuels ##


'Trump : Wow .'

In [15]:
from torch.nn.utils.rnn import pad_sequence

batch_size = 64

def collate_fn(batch):
    #sequences, labels = zip(*batch)
    lengths = [len(seq) for seq in batch]
    padded_sequences = pad_sequence(batch, batch_first=False, padding_value=PAD)
    return padded_sequences, torch.tensor(lengths)

data = DataLoader(ds, collate_fn=collate_fn, batch_size=batch_size, shuffle=True)

In [16]:
# recuperationn du premier batch et test sur la forme des sorties

x,lengths = next(iter(data))
print("x:", x, x.size())
print("lengths:", lengths, lengths.size())
#print("y:", y, y.size())



x: tensor([[  38, 1725,  296,  ...,  371,   38, 5626],
        [  10,  534,  289,  ...,   10,   10,  383],
        [  71,  313, 2375,  ...,  195,  417, 2497],
        ...,
        [   3,    3,    3,  ...,    3,    3,    3],
        [   3,    3,    3,  ...,    3,    3,    3],
        [   3,    3,    3,  ...,    3,    3,    3]]) torch.Size([47, 64])
lengths: tensor([38, 17, 41, 10,  9, 28, 33, 25, 24, 18,  3, 11, 11,  9, 19, 25,  8,  8,
        36, 12, 11, 17, 25, 16,  3,  9, 13,  7, 10, 24, 16, 31, 11,  6, 15,  9,
         8, 25, 20, 24, 39, 16, 28,  9, 44, 15, 40, 29, 13, 26, 22, 20, 15, 37,
        47, 22, 31,  6,  8,  5,  6,  8, 14,  4]) torch.Size([64])


## B. Creating the Network


Implementing a RNN with a decoder


In [17]:
import torch.nn as nn
import torch.nn.functional as F

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size,  n_voc):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        self.input_size  = input_size

        self.emb = nn.Embedding( n_voc, input_size)
        

        # 1. Définition du module récurrent (self.rec)
        # 2. Définition du décodeur (self.decoder)
        # pas de piège, juste une réflexion sur les dimensions
        ##  TODO 
        

    def forward(self, input, hid=None):
        # 1. translation of the input from int to one-hot
        # 2. Passage dans le rec
        # 3. retour de la prediction sur la dernière couche

        ##  TODO 
        return output, last



In [18]:

# choose hidden size
input_size = 128
hidden_size = 64
# build network
rnn = RNN( input_size, hidden_size,  tokenizer.get_vocab_size())
rnn.name = "baseRNN-"+time.asctime()

In [19]:
# sanity check : est ce que les données passent dans le réseau? 
x,lengths = next(iter(data))
output, hidden = rnn(x)

print("output:",output.size())
# print("hidden:", hidden.size())

output: torch.Size([62, 64, 10000])


L'étape difficile maintenant:

1. Calcul des yhat
2. Mise en face des vérités terrains = le mot d'après

... Mais il ne faut pas appliquer la loss sur le padding

... Et on veut conserver un traitement sur les batchs $\rightarrow$ introduire un masque sur la loss, il faut réfléchir en terme de graphe de calcul.

In [20]:
def maskedCrossEntropy(output: torch.Tensor, target: torch.LongTensor, padcar: int):
    """
    :param output: Tenseur length x batch x output_dim,  
    :param target: Tenseur length x batch
    :param padcar: index du caractere de padding
    """
    mask = target != padcar
    # print(mask)
    f_mask = mask.flatten()
    f_target = target.flatten()
    f_output = output.view(-1, output.shape[2])
    # print(f_mask.size(), f_target.size(), f_output.size())
    return (f_mask * nn.CrossEntropyLoss()(f_output, f_target)).sum() / mask.sum()

In [21]:
# batch
x,lengths = next(iter(data))
y = x[1:,:] 
x = x[:-1,:]
print("x:", x.size())
print("y:", y.size())
# modif 3 
# yhat, hidden = rnn(x, lengths)
yhat, hidden = rnn(x)
print(yhat.size())
l = maskedCrossEntropy(yhat,y, PAD)

x: torch.Size([132, 64])
y: torch.Size([132, 64])
torch.Size([132, 64, 10000])


## D. Training

1. put the data into a DataLoader
2. choose a loss function 
3. run a standard training loop

In [22]:
# définition de la métrique d'évaluation
def accuracy(yhat,y):
    # y encode les indexes, s'assurer de la bonne taille de tenseur
    assert len(y.shape)==1 or y.size(1)==1
    return (torch.argmax(yhat,1).view(y.size(0),-1)== y.view(-1,1)).float().mean()

In [23]:

    
def train(model,epochs,train_loader):
    writer = SummaryWriter(f"{TB_PATH}/{model.name}")
    optim = torch.optim.Adam(model.parameters(),lr=1e-3)    # choix optimizer
    model = model.to(device)
    print(f"running {model.name}")
    # loss = nn.CrossEntropyLoss(weight=cl_weight.to(device))      # choix loss (test en modification 4, independant de la pondération )
    for epoch in tqdm(range(epochs)):
        cumloss, cumacc, count = 0, 0, 0
        model.train()
        # for x, lengths, y in train_loader:   # boucle sur les batchs
        for x, lengths in train_loader:              # boucle batch de 1
            y = x[1:,:] 
            x = x[:-1,:]
            optim.zero_grad()
            x,y = x.to(device), y.to(device)               # y doit être un tensor (pas un int)

            yhat, hidden = model(x) # batch = 1
            l = maskedCrossEntropy(yhat,y, PAD)
            l.backward()
            optim.step()
            cumloss += l*len(x)                             # attention, il peut y avoir un batch + petit (le dernier)
            # cumacc += accuracy(yhat,y)*len(x)
            count += len(x)
        writer.add_scalar('loss/train',cumloss/count,epoch)
        


In [None]:
# ~10 minutes sur CPU
n_epoch = 40 # pour testet, ensuite, on peut prendre 100
train(rnn, n_epoch, data)


running baseRNN-Tue Jan  7 13:14:36 2025


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
 90%|█████████ | 36/40 [24:20<02:32, 38.16s/it]

In [None]:
import os

def save_model(model,fichier): # pas de sauvegarde de l'optimiseur ici
      """ sauvegarde du modèle dans fichier """
      state = {'model_state': model.state_dict()}
      torch.save(state,fichier) # pas besoin de passer par pickle
 
def load_model(fichier,model):
      """ Si le fichier existe, on charge le modèle  """
      if os.path.isfile(fichier):
          state = torch.load(fichier)
          model.load_state_dict(state['model_state'])
      else:
           print("Erreur de chargement du fichier")

In [None]:
# sauvegarde du modèle

path = "./"
fichier = path+f"model/{rnn.name}"
save_model(rnn.to("cpu"),fichier)

In [None]:
# chargement
path = "./"
fichier = path+f"model/LSTM-Trump"

load_model(fichier,rnn)

## D. Construction de phrases et analyse des résultats

Il est important de mettre en place un beam search pour faire fonctionner ces algorithmes génératifs

1. On commence par la version naïve
2. On passe aux approches à base de faisceaux (beam-search)

In [None]:
sent = "immigration"

with torch.no_grad():
    x = torch.tensor(tokenizer.encode(sent).ids).unsqueeze(1)
    # print(x)
    for i in range(30): # longeur arbitraire
        # 1. Faire passer x dans le rnn
        # 2. Récupérer l'argmax du y
        # 3. Concatener l'indice  à x
        # TODO 

    # 4. Décoder la chaine générée
    print(tokenizer.decode(x.squeeze().tolist()))

### Mise en place d'un beam search

1. Tester une fonction de génération stochastique
2. Utiliser le beam search
3. Ajouter une fonction de bonification des phrases longues (lien)[https://opennmt.net/OpenNMT/translation/beam_search/#length-normalization]

In [None]:
def generate(rnn,tokenizer, eos, start="", maxlen=200):
    """  Fonction de génération (l'embedding et le decodeur être des fonctions du rnn).
         Initialise le réseau avec start (ou à 0 si start est vide) 
         et génère une séquence de longueur maximale 200 ou qui s'arrête quand eos est généré. 
    """
    with torch.no_grad():
        x = torch.tensor(tokenizer.encode(start).ids).unsqueeze(1)
        # procédure identique à la boite précédente... A l'exception de:
        # c = torch.multinomial(torch.softmax(y, dim=1), 1)[0] => tirage multinomial
        # TODO 

    return tokenizer.decode(x.squeeze().tolist())

In [None]:
print(generate(rnn,tokenizer, EOS, start="Trump is", maxlen=30))

In [None]:
import math
class LengthPower():
    def __init__(self, base=5, alpha=0.7):
        self.base = base
        self.alpha = alpha

    def lengthpower(self, length):
        # From https://opennmt.net/OpenNMT/translation/beam_search/#length-normalization
        return math.exp(self.alpha * (math.log(self.base + length) - math.log(self.base + 1)))
    



In [None]:
def generate_beam(rnn, tokenizer, eos, k, start="", maxlen=200):
    """ 
        Génere une séquence en beam-search : à chaque itération, 
        on explore pour chaque candidat les k symboles les plus probables; 
        puis seuls les k meilleurs candidats de l'ensemble des séquences générées sont conservés 
        (au sens de la vraisemblance) pour l'itération suivante.
    """
    x = torch.tensor(tokenizer.encode(start).ids).unsqueeze(1)
    # print(x.size())
    with torch.no_grad():
        y,h = rnn(x)
        # print(h.size())
        if x.size()[0]>1:
            y = y[-1,:,:]

        candidates = [ (h, x, y,  0, len(x), False) ]

        for i in range(maxlen):
            new_candidates = []
            for h, x, y, logp, length, is_eos in candidates:
                if is_eos:
                    new_candidates.append((None, x, y, logp, length, None))
                else:
                    values, indices = torch.log_softmax(y, dim=1).topk(k)
                    for logp_c, c in zip(values[0], indices[0]):
                        # print(LengthPower().lengthpower(length+1))
                        new_candidates.append((h, x, y, (logp + logp_c)/LengthPower().lengthpower(length+1), length+1, c))

            candidates = []
            # print([[h[0].size(), x.size(), y.size(), logp, length, c] for h, x, y, logp, length, c in new_candidates])
            for h, x, y, logp, length, c in sorted(new_candidates, key=lambda x: -x[3])[:k]:
                if c is None:
                    candidates.append((None, x, y, logp, length, True))
                else:
                    c = c.unsqueeze(0).unsqueeze(0)
                    # print(x.size(), c.size())
                    x = torch.cat((x, c),0)
                    y,h = rnn(c, h)
                    # y = y[-1,:,:]
                    # state = rnn()
                    candidates.append((h, x, y, logp, length, c[0][0] == eos))


    return tokenizer.decode(candidates[0][1].squeeze().tolist())

print(generate_beam(rnn,tokenizer, EOS, 5, start="Immigration", maxlen=30))


# Construction du sujet à partir de la correction

In [None]:
###  TODO )"," TODO ",\
    txt, flags=re.DOTALL))
f2.close()

### </CORRECTION> ###