# Traduction

In [1]:
import logging
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torch.nn.functional import pad
import torch
import unicodedata
import string
from tqdm import tqdm
from pathlib import Path
from typing import List
import matplotlib.pyplot as plt 
import numpy as np
import time
import re
import datetime

from torch.utils.tensorboard import SummaryWriter


2023-11-18 18:20:18.296548: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-11-18 18:20:18.296653: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-11-18 18:20:18.296714: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-11-18 18:20:18.313402: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
logging.basicConfig(level=logging.INFO)

FILE = "../data/en-fra.txt"

writer = SummaryWriter("/tmp/runs/tag-"+time.asctime())

def normalize(s):
    return re.sub(' +',' ', "".join(c if c in string.ascii_letters else " "
         for c in unicodedata.normalize('NFD', s.lower().strip())
         if  c in string.ascii_letters+" "+string.punctuation)).strip()


class Vocabulary:
    """Permet de gérer un vocabulaire.

    En test, il est possible qu'un mot ne soit pas dans le
    vocabulaire : dans ce cas le token "__OOV__" est utilisé.
    Attention : il faut tenir compte de cela lors de l'apprentissage !

    Utilisation:

    - en train, utiliser v.get("blah", adding=True) pour que le mot soit ajouté
      automatiquement
    - en test, utiliser v["blah"] pour récupérer l'ID du mot (ou l'ID de OOV)
    """
    PAD = 0
    EOS = 1
    SOS = 2
    OOVID = 3

    def __init__(self, oov: bool):
        self.oov = oov
        self.id2word = ["PAD", "EOS", "SOS"]
        self.word2id = {"PAD": Vocabulary.PAD, "EOS": Vocabulary.EOS, "SOS": Vocabulary.SOS}
        if oov:
            self.word2id["__OOV__"] = Vocabulary.OOVID
            self.id2word.append("__OOV__")

    def __getitem__(self, word: str):
        if self.oov:
            return self.word2id.get(word, Vocabulary.OOVID)
        return self.word2id[word]

    def get(self, word: str, adding=True):
        try:
            return self.word2id[word]
        except KeyError:
            if adding:
                wordid = len(self.id2word)
                self.word2id[word] = wordid
                self.id2word.append(word)
                return wordid
            if self.oov:
                return Vocabulary.OOVID
            raise

    def __len__(self):
        return len(self.id2word)

    def getword(self, idx: int):
        if idx < len(self):
            return self.id2word[idx]
        return None

    def getwords(self, idx: List[int]):
        return [self.getword(i) for i in idx]



class TradDataset():
    def __init__(self,data,vocOrig,vocDest,adding=True,max_len=10):
        self.sentences =[]
        for s in tqdm(data.split("\n")):
            if len(s)<1:continue
            orig,dest=map(normalize,s.split("\t")[:2])
            if len(orig)>max_len: continue
            self.sentences.append((torch.tensor([vocOrig.get(o) for o in orig.split(" ")]+[Vocabulary.EOS]),torch.tensor([vocDest.get(o) for o in dest.split(" ")]+[Vocabulary.EOS])))
    def __len__(self):return len(self.sentences)
    def __getitem__(self,i): return self.sentences[i]



def collate_fn(batch):
    orig,dest = zip(*batch)
    o_len = torch.tensor([len(o) for o in orig])
    d_len = torch.tensor([len(d) for d in dest])
    return pad_sequence(orig),o_len,pad_sequence(dest),d_len




In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


with open(FILE) as f:
    lines = f.readlines()

lines = [lines[x] for x in torch.randperm(len(lines))]
idxTrain = int(0.8*len(lines))

vocEng = Vocabulary(True)
vocFra = Vocabulary(True)
MAX_LEN=100
BATCH_SIZE=100

datatrain = TradDataset("".join(lines[:idxTrain]),vocEng,vocFra,max_len=MAX_LEN)
datatest = TradDataset("".join(lines[idxTrain:]),vocEng,vocFra,max_len=MAX_LEN)

train_loader = DataLoader(datatrain, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(datatest, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)

#  TODO:  Implémenter l'encodeur, le décodeur et la boucle d'apprentissage


100%|█████████████████████████████████████████████████████████████████████████████| 136521/136521 [00:29<00:00, 4678.69it/s]
100%|███████████████████████████████████████████████████████████████████████████████| 34132/34132 [00:06<00:00, 5278.09it/s]


In [4]:
class Encoder(nn.Module):
    def __init__(self, vocab_enc, dim_latent_enc, dim_hidden_enc, pad_index):
        super(Encoder, self).__init__()
        self.enc_emb =  nn.Embedding(vocab_enc, dim_latent_enc, padding_idx=pad_index) 
        self.enc_gru = nn.GRU(dim_latent_enc, dim_hidden_enc)
    
    def forward(self, x):
        x_emb = self.enc_emb(x)
        _, h_n = self.enc_gru(x_emb)
        return h_n

class Decoder(nn.Module):
    def __init__(self, vocab_dec, dim_latent_dec, dim_hidden_dec, pad_index) :
        super(Decoder, self).__init__()
        self.vocab_dec = vocab_dec
        self.dec_emb =  nn.Embedding(vocab_dec, dim_latent_dec, padding_idx = pad_index) 
        self.dec_gru = nn.GRU(dim_latent_dec, dim_hidden_dec)
        self.decode = nn.Linear(dim_hidden_dec, vocab_dec) 
    
    def forward(self, x, hidden):
        emb = self.dec_emb(x)
        _, h_n = self.dec_gru(emb, hidden)
        dec = self.decode(h_n)   
        return h_n, dec 
    
    def generate(self, hidden, lenseq=None, use_teacher_forcing=False, target=None):
        sos = Vocabulary.SOS
        eos = Vocabulary.EOS

        batch_size = hidden.shape[1] 
                
        trad = torch.full((1, batch_size), sos, dtype=torch.long, device=hidden.device)
        trad = torch.nn.functional.one_hot(trad, num_classes=self.vocab_dec)
        x = torch.full((1, batch_size), sos, dtype=torch.long, device=hidden.device)

        ht = hidden 
        i = 0
        cpt_eos = 0
        
        while lenseq==None or i<lenseq :
            ht, dec = self.forward(x, ht)
            output = nn.functional.softmax(dec, dim=1)

            x = torch.argmax(output, axis = 2).reshape(1,-1)
            
            if use_teacher_forcing : 
                trad = torch.cat((trad, output), dim = 0)
                x = target[i,:].reshape(1,-1)
            else : 
                trad = torch.cat((trad, output), dim = 0)

            cpt_eos += torch.sum(x==eos).item()
            if cpt_eos ==  batch_size: 
                break
            i+=1

        return trad[1:]

In [17]:

def train(encoder, decoder, criterion, train_loader, test_loader, teacher_forcing_prob = 0.5 , lr=0.3, epoch = 10 ):

    writer = SummaryWriter("traduction/"+datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))


    parameters = list(encoder.parameters()) + list(decoder.parameters())
    optimizer = torch.optim.Adam(params = parameters, lr = lr)


    liste_loss_train = []
    liste_loss_val = []
    for epoch in tqdm(range(epoch)):
        
        liste_loss_batch = []

        for input_seq, idx_pad_input, target_seq, idx_pad_target in tqdm(train_loader):
            input_seq , idx_pad_input, target_seq, idx_pad_target = input_seq.to(device) , idx_pad_input.to(device), target_seq.to(device), idx_pad_target.to(device)
            optimizer.zero_grad()
            use_teacher_forcing = True if torch.rand(1).item() < teacher_forcing_prob else False
            hidden = encoder(input_seq).to(device)
            yhat = decoder.generate(hidden, lenseq=torch.max(idx_pad_target), use_teacher_forcing=use_teacher_forcing, target=target_seq)
            yhat = torch.nn.functional.pad(yhat,  (0, target_seq.size(1) - yhat.size(1)), value=Vocabulary.PAD).to(dtype=torch.float32)
            yhat = torch.transpose(yhat,1,2)

            loss = criterion(yhat, target_seq)

            writer.add_scalar("Loss/train", loss, epoch)
            
            loss.backward()
            
            optimizer.step()
            
            with torch.no_grad():
                liste_loss_batch.append(loss.item())
            
        liste_loss_train.append(np.mean(liste_loss_batch))

        print("it")

        with torch.no_grad():
            
            liste_loss_batch = []

            for input_seq, idx_pad_input, target_seq, idx_pad_target in test_loader:
                input_seq , idx_pad_input, target_seq, idx_pad_target = input_seq.to(device) , idx_pad_input.to(device), target_seq.to(device), idx_pad_target.to(device)
                optimizer.zero_grad()
                use_teacher_forcing = True if torch.rand(1).item() < teacher_forcing_prob else False
                hidden = encoder(input_seq).to(device)
                yhat = decoder.generate(hidden, lenseq=torch.max(idx_pad_target), use_teacher_forcing=use_teacher_forcing, target=target_seq)
                yhat = torch.nn.functional.pad(yhat,  (0, target_seq.size(1) - yhat.size(1)), value=Vocabulary.PAD).to(dtype=torch.float32)
                yhat = torch.transpose(yhat,1,2)

                loss = criterion(yhat, target_seq)

                writer.add_scalar("Loss/test", loss, epoch)
                
                loss.backward()
                
                optimizer.step()
                
                with torch.no_grad():
                    liste_loss_batch.append(loss.item())
                    
            liste_loss_val.append(np.mean(liste_loss_batch))
 

        print("val")

    plt.figure()
    plt.plot(np.arange(len(liste_loss_train)), liste_loss_train, label='Loss train', color='tab:orange')
    plt.plot(np.arange(len(liste_loss_val)), liste_loss_val, label='Loss val', color='tab:blue')
    plt.xlabel("Epochs")
    plt.title("Loss en train et en validation")
    plt.legend(loc='upper left')
    plt.show()

In [19]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
vocab_enc = vocEng.__len__()  
dim_latent_enc = 10
dim_hidden = 5

vocab_dec = vocFra.__len__() 
dim_latent_dec = 10

pad_index = Vocabulary.PAD
lr = 0.3


encoder = Encoder(vocab_enc, dim_latent_enc, dim_hidden, pad_index).to(device)
decoder = Decoder(vocab_dec, dim_latent_dec, dim_hidden, pad_index).to(device)
criterion = torch.nn.CrossEntropyLoss(ignore_index=Vocabulary.PAD)
train(encoder, decoder, criterion, train_loader, test_loader)

cuda


  0%|                                                                                                | 0/10 [00:00<?, ?it/s]
  0%|                                                                                              | 0/1365 [00:00<?, ?it/s][A
  0%|                                                                                    | 1/1365 [00:06<2:37:34,  6.93s/it][A
  0%|                                                                                    | 2/1365 [00:14<2:46:57,  7.35s/it][A
  0%|▏                                                                                   | 3/1365 [00:21<2:46:41,  7.34s/it][A
  0%|▏                                                                                   | 4/1365 [00:47<4:29:40, 11.89s/it][A
  0%|                                                                                                | 0/10 [00:47<?, ?it/s]


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [462]:
def traduction(sentence, encoder, decoder):
    x = torch.tensor([vocEng.__getitem__(w) for w in sentence.split()]).reshape(-1,1)
    hidden = encoder(x)
    trad = decoder.generate(hidden, lenseq=20)
    trad = torch.argmax(trad, axis = 2).reshape(-1)
    return " ".join(vocFra.getwords(trad))

In [464]:
sentence = "hello i love cats and also dogs"
trad = traduction(sentence, encoder, decoder)
print(trad)


PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD PAD
