In [116]:
import sys
import unicodedata
import string
from typing import List
from torch.utils.data import Dataset, DataLoader
import torch
import re
import torch.nn.functional as F

## Token de padding (BLANK)
PAD_IX = 0
## Token de fin de séquence
EOS_IX = 1

LETTRES = string.ascii_letters + string.punctuation + string.digits + ' '
id2lettre = dict(zip(range(2, len(LETTRES)+2), LETTRES))
id2lettre[PAD_IX] = '<PAD>' ##NULL CHARACTER
id2lettre[EOS_IX] = '<EOS>'
lettre2id = dict(zip(id2lettre.values(),id2lettre.keys()))


def normalize(s):
    """ enlève les accents et les caractères spéciaux"""
    return ''.join(c for c in unicodedata.normalize('NFD', s) if  c in LETTRES)

def string2code(s):
    """prend une séquence de lettres et renvoie la séquence d'entiers correspondantes"""
    return torch.tensor([lettre2id[c] for c in normalize(s)])

def code2string(t):
    """ prend une séquence d'entiers et renvoie la séquence de lettres correspondantes """
    if type(t) !=list:
        t = t.tolist()
    return ''.join(id2lettre[i] for i in t)


class TextDataset(Dataset):
    def __init__(self, text: str, *, maxsent=None, maxlen=None):
        """  Dataset pour les tweets de Trump
            * fname : nom du fichier
            * maxsent : nombre maximum de phrases.
            * maxlen : longueur maximale des phrases.
        """
        maxlen = maxlen or sys.maxsize
        self.phrases = [re.sub(' +',' ',p[:maxlen]).strip() +"." for p in text.split(".") if len(re.sub(' +',' ',p[:maxlen]).strip())>0]
        if maxsent is not None:
            self.phrases=self.phrases[:maxsent]
        self.maxlen = max([len(p) for p in self.phrases])

    def __len__(self):
        return len(self.phrases)

    def __getitem__(self, i):
        return string2code(self.phrases[i])

def pad_collate_fn(samples: List[List[int]]):
    #  TODO:  Renvoie un batch à partir d'une liste de listes d'indexes (de phrases) qu'il faut padder.
    maxlen = max([len(p) for p in samples]) + 1 # +1 pour eos
    data = torch.empty(size=(len(samples), maxlen))
    for i, phrase in enumerate(samples):
        tmp = F.pad(phrase, pad=(0,1), mode='constant', value=lettre2id['<EOS>'])
        data[i] = F.pad(tmp, pad=(0,maxlen-len(tmp)), mode='constant', value=lettre2id['<PAD>'])
    return data.T

In [51]:
if __name__ == "__main__":
    test = "C'est. Un. Test."
    ds = TextDataset(test)
    loader = DataLoader(ds, collate_fn=pad_collate_fn, batch_size=3)
    data = next(iter(loader))
    print("Chaîne à code : ", test)
    # Longueur maximum
    assert data.shape == (7, 3)
    print("Shape ok")
    # e dans les deux cas
    assert data[2, 0] == data[1, 2]
    print("encodage OK")
    # Token EOS présent
    assert data[5,2] == EOS_IX
    print("Token EOS ok")
    # BLANK présent
    assert (data[4:,1]==0).sum() == data.shape[0]-4
    print("Token BLANK ok")
    # les chaînes sont identiques
    s_decode = " ".join([code2string(s).replace(id2lettre[PAD_IX],"").replace(id2lettre[EOS_IX],"") for s in data.t()])
    print("Chaîne décodée : ", s_decode)
    assert test == s_decode
    # " ".join([code2string(s).replace(id2lettre[PAD_IX],"").replace(id2lettre[EOS_IX],"") for s in data.t()])

Chaîne à code :  C'est. Un. Test.
Shape ok
encodage OK
Token EOS ok
Token BLANK ok
Chaîne décodée :  C'est. Un. Test.


In [92]:
def maskedCrossEntropy(output: torch.Tensor, target: torch.LongTensor, padcar: int):
    """
    :param output: Tenseur length x batch x output_dim, pas encore softmax ??
    :param target: Tenseur length x batch
    :param padcar: index du caractere de padding
    """
    sm_output = torch.log(torch.softmax(output, dim=-1)) # log softmax
    masque_target = torch.where(target == padcar, 0., 1.)
    index_target = target.unsqueeze(-1) # leng, batch, 1
    loss = -torch.gather(sm_output, dim=0, index=index_target).squeeze(-1)*masque_target
    return torch.sum(loss)

In [118]:
class LSTM(torch.nn.Module):
    def __init__(self, input_size, hidden_size, vocab_size):
        super().__init__()
        # define les vars
        self.input_size = input_size # ou embedding dimension
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        # Long term memory
        # Short term memory
        # forget gate
        self.f = torch.nn.Sequential(
            torch.nn.Linear(in_features=self.input_size+self.hidden_size, out_features=self.hidden_size),
            torch.nn.Sigmoid()
        )
        # input gate
        self.i = torch.nn.Sequential(
            torch.nn.Linear(in_features=self.input_size+self.hidden_size, out_features=self.hidden_size),
            torch.nn.Sigmoid()
        )
        self.g = torch.nn.Sequential(
            torch.nn.Linear(in_features=self.input_size+self.hidden_size, out_features=self.hidden_size),
            torch.nn.Tanh()
        )
        # output gate
        self.o = torch.nn.Sequential(
            torch.nn.Linear(in_features=self.input_size+self.hidden_size, out_features=self.hidden_size),
            torch.nn.Sigmoid()
        )
        # decode
        self.decode = torch.nn.Linear(in_features=self.hidden_size, out_features=self.vocab_size)

    def one_step(self, x, c, h):
        """
        Input:
            x: input au moment t
            c: long term memory au moment t-1
            h: short term memory au moment t-1
        """
        # forget gate
        f_t = self.f(torch.cat([x, h], dim=-1))
        c_t = torch.mul(c, f_t)
        # input gate
        i_t = self.i(torch.cat([x, h], dim=-1))
        g_t = self.g(torch.cat([x, h], dim=-1))
        c_t = torch.add(c_t, torch.mul(i_t, g_t))
        # output gate
        o_t = self.o(torch.cat([x, h], dim=-1))
        h_t = torch.mul(o_t, torch.tanh(c_t))

        return c_t, h_t
    
    def forward(self, x):
        """
        Input:
            x: input, tensor 2-D (Length, Batch, embedding_dim)
        """
        len_seq, batch_size = x.shape[0], x.shape[1]
        c, h = self.init_long_short_term(batch_size)
        h_full = torch.empty(size=(len_seq, batch_size, self.hidden_size))
        for i in range(len_seq):
            c, h = self.one_step(x[i], c, h)
            h_full[i] = h
        return h_full
    
    def decode(self, h, training = True):
        if training:
            return self.decode(h) # raw logits, softmax dans loss fonction correspondant CELoss in torch
        return torch.softmax(self.decode(h), dim=-1)

    def init_long_short_term(self, batch_size):
        return torch.zeros(size=(batch_size, self.hidden_size)), torch.zeros(size=(batch_size, self.hidden_size))

In [103]:
em = torch.nn.Embedding(num_embeddings=10, embedding_dim=5)

In [104]:
b = em(a)
b

tensor([[ 1.3361,  0.0434, -0.8069, -0.2445,  0.2521],
        [ 0.6464,  1.1616,  1.0572,  1.3739, -0.5292],
        [ 0.2144,  1.5302, -0.3469, -0.1844,  1.0415],
        [-0.3753,  0.8793,  1.8172,  1.0014,  1.0258]],
       grad_fn=<EmbeddingBackward0>)

In [112]:
torch.tanh(torch.tensor([-1]))

tensor([-0.7616])