In [1]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import logging
import csv
from torch.utils.data import Dataset, DataLoader
from collections import Counter

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.basicConfig(level=logging.INFO)


def fill_na(mat):
    ix,iy = np.where(np.isnan(mat))
    for i,j in zip(ix,iy):
        if np.isnan(mat[i+1,j]):
            mat[i,j]=mat[i-1,j]
        else:
            mat[i,j]=(mat[i-1,j]+mat[i+1,j])/2.
    return mat


def read_temps(path):
    """Lit le fichier de températures"""
    data = []
    with open(path, "rt") as fp:
        reader = csv.reader(fp, delimiter=',')
        next(reader)
        for row in reader:
            if not row[1].replace(".","").isdigit():
                continue
            data.append([float(x) if x != "" else float('nan') for x in row[1:]])
    return torch.tensor(fill_na(np.array(data)), dtype=torch.float)

In [2]:
class RNN(nn.Module):
    
    def __init__(self, latent_dim, input_dim, output_dim):
        super().__init__()
        self.latent_size = latent_dim
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.act_encode = torch.tanh
        self.act_decode = torch.tanh

        # Network parameters
        self.linearX = nn.Linear(input_dim, latent_dim, bias=True)
        self.linearH = nn.Linear(latent_dim, latent_dim, bias=False)
        
        self.linearD = nn.Linear(latent_dim, output_dim, bias=True)
        

    def one_step(self, x, h):
        """ 
        compute the hidden state for one step of time
        dim(x) = batch x dimX
        dim(h) = batch x latent_size
        """
        return self.act_encode(self.linearX(x) + self.linearH(h))

    def forward(self, x):
        """
        Treat a batch of sequences,
        x -> batch of sequences, dim(X) = lenght_sequence x batch x dimX
        h -> init hidden state, dim(h) = batch x latent_size

        return a batch of hidden state sequences -> dim = lenght_sequence x batch x latent_size
        """
        length, batch, dim = x.shape
        res = []
        res.append(self.one_step(x[0], torch.zeros((batch, self.latent_size), dtype=torch.float)))

        for i in range(1,length):
            res.append(self.one_step(x[i], res[i-1]))

        return torch.stack(res)

        
    def decode(self, h):
        """
        decode a batch of hidden state
        """
        return self.act_decode(self.linearD(h))
    

In [3]:
import string
import unicodedata
import re

In [23]:
PAD_IX = 0
EOS_IX = 1

#LETTRES = string.ascii_letters + string.punctuation+string.digits+' '
#LETTRES = string.ascii_letters+' '
LETTRES = string.ascii_letters[:26]+"."+' '
id2lettre = dict(zip(range(2, len(LETTRES)+2), LETTRES))
id2lettre[PAD_IX] = '' ##NULL CHARACTER
id2lettre[EOS_IX] = '|'
lettre2id = dict(zip(id2lettre.values(), id2lettre.keys()))

def normalize(s):
    return ''.join(c for c in unicodedata.normalize('NFD', s) if c in LETTRES)

def string2code(s):
    return torch.tensor([lettre2id[c] for c in normalize(s)])

def code2string(t):
    if(type(t)!=list):
        t = t.tolist()
    return ''.join(id2lettre[i] for i in t)

def str2code(s):
    return [lettre2id[c] for c in s]

def strs2code(ss):
    return torch.LongTensor([str2code(s) for s in ss])

def cleanTrumpData(s):
    tmp = re.sub("\[[^]]+\]", "", s) #delete non vocan words as [applause]
    tmp = re.sub(":\s*pmurT\s*\.", ":%.", tmp[::-1]) #reverse string and replace trump by %
    tmp = re.sub(":[^.%]+?\.", ":@.", tmp) # place all no trump speaker by @
    tmp = re.sub("^\s*Trump", "%", tmp[::-1]) #reverse string and replace first Trump by %
    tmp = re.sub("@\s*:[^%]+?%", "%", tmp)  #delete words not say by trump
    return re.sub("%:", "", tmp)# delete %: wich is just to show wo speaks (but now it is trump every time)

In [24]:
with open("data/trump_full_speech.txt", 'r') as f:
    data = f.read()

In [25]:
cleanedNormalizedData = normalize(cleanTrumpData(data).lower().replace("!",".").replace("?","."))

In [26]:
class TextDataset(Dataset):
    def __init__(self, text: str, *, maxsent=None, maxlen=None):
        maxlen = np.inf if maxlen==None else maxlen
        phrases = [phrase.strip() for phrase in text.split(".")]
        phrases = [strs2code(phrase).squeeze(1) for phrase in phrases if len(phrase)>5 and len(phrase)<maxlen]
        self.phrases = phrases
        

    def __len__(self):
        return len(self.phrases)

    def __getitem__(self, i):
        return self.phrases[i]

In [57]:
def collate_fn(samples):
    
    lenMax = np.max([len(e) for e in samples])
    res = []
    eos = torch.tensor([EOS_IX], dtype=torch.int)

    for sample in samples:
        pads = torch.full((lenMax-len(sample),), PAD_IX, dtype=torch.int)
        res.append(torch.cat((sample, pads, eos), 0))

    return torch.stack(res)

In [63]:
BATCH_SIZE = 3
loader = DataLoader(TextDataset(cleanedNormalizedData), collate_fn=collate_fn, batch_size=BATCH_SIZE)

In [70]:
class GRU(nn.Module):
    
    def __init__(self, latent_dim, input_dim, output_dim):
        super().__init__()
        self.latent_size = latent_dim
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        self.sigmoid = torch.sigmoid
        self.tanh = torch.tanh


        # Network parameters
        self.linearZ = nn.Linear(input_dim+latent_dim, latent_dim, bias=False)
        self.linearR = nn.Linear(input_dim+latent_dim, input_dim+latent_dim, bias=False)
        self.linearH = nn.Linear(input_dim+latent_dim, latent_dim, bias=False)
        
        

    def one_step(self, x, h):
        """ 
        compute the hidden state for one step of time
        dim(x) = batch x dimX
        dim(h) = batch x latent_size
        """
        concatHX = torch.cat((x, h), 1)
        zt = self.sigmoid(self.linearZ(concatHX))
        rt = self.sigmoid(self.linearR(concatHX))
        ht = (1-zt)*h + zt* self.tanh(self.linearH(rt*concatHX))
        return ht

    def forward(self, x):
        """
        Treat a batch of sequences,
        x -> batch of sequences, dim(X) = lenght_sequence x batch x dimX
        h -> init hidden state, dim(h) = batch x latent_size

        return a batch of hidden state sequences -> dim = lenght_sequence x batch x latent_size
        """
        length, batch, dim = x.shape
        res = []
        res.append(self.one_step(x[0], torch.zeros((batch, self.latent_size), dtype=torch.float)))

        for i in range(1,length):
            res.append(self.one_step(x[i], res[i-1]))

        return torch.stack(res)

        
    def decode(self, h):
        """
        decode a batch of hidden state
        """
        return self.act_decode(self.linearD(h))
    

In [71]:
class LSTM(nn.Module):
    
    def __init__(self, latent_dim, input_dim, output_dim):
        super().__init__()
        self.latent_size = latent_dim
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        self.sigmoid = torch.sigmoid
        self.tanh = torch.tanh
        
        self.ct = torch.zeros((BATCH_SIZE, latent_dim))


        # Network parameters
        self.linearF = nn.Linear(input_dim+latent_dim, latent_dim, bias=True)
        self.linearI = nn.Linear(input_dim+latent_dim, latent_dim, bias=True)
        self.linearC = nn.Linear(input_dim+latent_dim, latent_dim, bias=True)
        self.linearO = nn.Linear(input_dim+latent_dim, latent_dim, bias=True)
        

    def one_step(self, x, h):
        """ 
        compute the hidden state for one step of time
        dim(x) = batch x dimX
        dim(h) = batch x latent_size
        """
        concatHX = torch.cat((x, h), 1)
        ft = self.sigmoid(self.linearF(concatHX))
        it = self.sigmoid(self.linearI(concatHX))
        self.ct = ft*self.ct + it*self.tanh(self.linearC(concatHX))
        ot = self.sigmoid(self.linearO(concatHX))
        ht = ot*self.tanh(ct)
        
        zt = self.sigmoid(self.linearZ(concatHX))
        rt = self.sigmoid(self.linearR(concatHX))
        ht = (1-zt)*h + zt* self.tanh(self.linearH(rt*concatHX))
        return ht

    def forward(self, x):
        """
        Treat a batch of sequences,
        x -> batch of sequences, dim(X) = lenght_sequence x batch x dimX
        h -> init hidden state, dim(h) = batch x latent_size

        return a batch of hidden state sequences -> dim = lenght_sequence x batch x latent_size
        """
        length, batch, dim = x.shape
        res = []
        res.append(self.one_step(x[0], torch.zeros((batch, self.latent_size), dtype=torch.float)))

        for i in range(1,length):
            res.append(self.one_step(x[i], res[i-1]))

        return torch.stack(res)

        
    def decode(self, h):
        """
        decode a batch of hidden state
        """
        return self.act_decode(self.linearD(h))
    

# Classification de sequence

# Generation de sequence