**TP n°6**
Notions abordées:

En partie I:
- Prévision par RNN: un premier apprentissage sur une sinusoïde
- Apprentissage d'un LSTM sur une sinusoïde

En partie II:
- Word embedding 
- Apprentissage d'un LSTM plus complet

En partie III:
- Reconnaissance auto. de la parole : utilisation d'un RNN-T

En partie IV:
- Reconnaissance auto. de la parole : utilisation d'un Wav2Vec2


Durée : 3 h

**Partie I**

Cette partie introduit les RNN. Un RNN travaille sur une série temporelle de tenseurs. Il peut avoir pour objectif de prédire une classe pour la série, une classe pour chaque élément de la série, ou encore de prévoir le(s) prochain(s) éléments. 

La [page suivante](https://stanford.edu/~shervine/teaching/cs-230/cheatsheet-recurrent-neural-networks) permet de visualiser différentes configurations pour l'apprentissage.

**Exercice 1** Dans un premier temps, nous allons entraîner un RNN en "Many-to-One" pour reconstruire une sinusoïde. 
Instancier un RNN simple à l'aide du code qui suit.

In [None]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import Dataset
import torch
import matplotlib.pyplot as plt
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
from torch.utils.data import DataLoader

class SimpleRNN(nn.Module):

    def __init__(self, rnn_type, input_size, hidden_size, num_layers):
        super(SimpleRNN, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        
        if rnn_type == 'RNN':
            self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, dropout=(0 if num_layers == 1 else 0.05), num_layers=num_layers, batch_first=True)
        elif rnn_type == 'GRU':
            self.rnn = nn.GRU(input_size=input_size, hidden_size=hidden_size, dropout=(0 if num_layers == 1 else 0.05), num_layers=num_layers, batch_first=True)

        self.out = nn.Linear(hidden_size, 1)  

    def forward(self, x, h_state):
        
        r_out, h_state = self.rnn(x, h_state)

        final_y = self.out(r_out[:, -1, :])  

        return final_y, h_state
    
RNN_TYPE = 'GRU'  
rnn = SimpleRNN(RNN_TYPE, input_size=1, hidden_size=4, num_layers=1).to(device)

**Q1** Que représentent les deux tenseurs *x* et *h_state* en entrée du RNN ?
Comment le mode "Many-to-One" se traduit-il ?

**Q2** Que définissent les paramètres *hidden size* et *num_layers* ?

Donnons-nous maintenant des séquences sur lesquelles apprendre. Pour entraîner à reproduire une sinusoïde, nous allons extraire des sous-suites d'une série temporelle de base définie par $u_{t_i} = sin(t_i) $. Ces sous-suites sont mises à disposition grâce au Dataset suivant:

In [None]:
class RNNDataset(Dataset):

    def __init__(self, x, y=None):
        self.data = x
        self.labels = y

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        if self.labels is not None:
            return self.data[idx], self.labels[idx]
        else:
            return self.data[idx]


def create_dataset(sequence_length, train_percent=0.8):

    # Create sin wave at discrete time steps.
    num_time_steps = 1500
    time_steps = np.linspace(start=0, stop=1000, num=num_time_steps, dtype=np.float32)
    discrete_sin_wave = (np.sin(time_steps * 0.5)).reshape(-1, 1)

    # Take (sequence_length + 1) elements & put as a row in sequence_data, extra element is value we want to predict.
    # Move one time step and keep grabbing till we reach the end of our sampled sin wave.
    sequence_data = []
    for i in range(num_time_steps - sequence_length):
        sequence_data.append(discrete_sin_wave[i: i + sequence_length + 1, 0])
    sequence_data = np.array(sequence_data)

    # Split for train/val.
    num_total_samples = sequence_data.shape[0]
    num_train_samples = int(train_percent * num_total_samples)

    train_set = sequence_data[:num_train_samples, :]
    test_set = sequence_data[num_train_samples:, :]

    print('{} total sequence samples, {} used for training'.format(num_total_samples, num_train_samples))

    # Take off the last element of each row and this will be our target value to predict.
    x_train = train_set[:, :-1][:, :, np.newaxis]
    y_train = train_set[:, -1][:, np.newaxis]
    x_test = test_set[:, :-1][:, :, np.newaxis]
    y_test = test_set[:, -1][:, np.newaxis]

    return x_train, y_train, x_test, y_test



**Q3** Créer et tracer la série temporelle de base. 

**Q4** Instancier le Dataset, créer un Loader et le tester.

**Q5** Que représente la cible $y$ par rapport à l'entrée $x$ ?

**Q6** Compléter la boucle d'apprentissage suivante:

In [None]:
def train_model(model, dataloader, loss_function, optimizer, epochs):
    model.train()
    loss_all = []


    for epoch in range(epochs):
        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            h_state = torch.zeros([model.num_layers, x_batch.size()[0], model.hidden_size]).to(device)

            [...]

        loss_all.append(loss.cpu().data.numpy())
        print('train loss epoch{}: '.format(epoch), loss.cpu().data.numpy())






**Q7** Lancer l'apprentissage avec des paramètres standards.

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

LEARNING_RATE = 0.01
BATCH_SIZE = 100
NUM_EPOCHS = 100
SEQUENCE_LENGTH = 50


# Define the model, optimizer and loss function.

optimizer = torch.optim.Adam ...
loss_function = ...

...


**Q8** Le code suivant doit permettre de générer des prédictions avec le RNN.
Que fait-il ? Le tester.


In [None]:
def generate_predictions(model, dataloader, init_sequence_length):
    """From a trained model predict """
    model.eval()

    h_state = torch.zeros([model.num_layers, 1, model.hidden_size]).to(device)  
    initial_input = next(iter(dataloader))[1].to(device) 
    initial_input.unsqueeze_(0).data  

    final_outputs = []
    for _ in range(len(dataloader.dataset.labels)-init_sequence_length):

        output, _ = model(initial_input, h_state)
        final_outputs.append(output.cpu().squeeze_().data)

        
        x = torch.clone(initial_input)
        initial_input.data[:, 0:init_sequence_length-1, :] = x.data[:, 1:init_sequence_length, :]
        initial_input.data[:, init_sequence_length-1, :] = output.data


    plt.plot(final_outputs, label='predicted')
    plt.plot(dataloader.dataset.labels[init_sequence_length:], label='actual')
    plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
    plt.show()



**Q9** Tester avec un RNN plus complexe (GRU) et interpréter.

In [None]:
generate_predictions(rnn, val_dataloader, init_sequence_length=SEQUENCE_LENGTH)

**Exercice 2** Un LSTM sur une tâche de reconstruction de sinus.

Le LSTM est un RNN prenant en entrée trois tenseurs: l'entrée courante, la sortie associée à l'entrée précédente et une "mémoire à long terme". Ce [post](https://colah.github.io/posts/2015-08-Understanding-LSTMs/) résume clairement sont fonctionnement.

Le code suivant doit permettre de reconstruire un sinus avec un LSTM. Préciser le mode sur lequel le LSTM est appris, compléter et commenter.

In [None]:
# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import matplotlib.pyplot as plt
#%matplotlib inline

x = torch.linspace(0,799,800)
y = torch.sin(x*2*3.1416/40)

plt.figure(figsize=(12,4))
plt.xlim(-10,801)
plt.grid(True)
plt.xlabel("x")
plt.ylabel("sin")
plt.title("Sin plot")
plt.plot(y.numpy(),color='#8000ff')
plt.show()

In [None]:
test_size = 40
train_set = y[:-test_size]
test_set = y[-test_size:]

# voir l'ensemble de test
plt.figure(figsize=(12,4))
plt.xlim(-10,801)
plt.grid(True)
plt.xlabel("x")
plt.ylabel("sin")
plt.title("Sin plot")
plt.plot(train_set.numpy(),color='#8000ff')
plt.plot(range(760,800),test_set.numpy(),color="#ff8000")
plt.show()

In [None]:
# batches:
def input_data(seq,ws):
    out = []
    L = len(seq)
    
    for i in range(L-ws):
        window = seq[i:i+ws]
        label = seq[i+ws:i+ws+1]
        out.append((window,label))
    
    return out

window_size = 40
train_data = input_data(train_set, window_size)
len(train_data)

train_data[0]

In [None]:
# Le modèle :
class LSTM(nn.Module):
    
    def __init__(self,input_size = 1, hidden_size = 50, out_size = 1):
        super().__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size)
        self.linear = nn.Linear(hidden_size,out_size)
        self.hidden = (torch.zeros(1,1,hidden_size),torch.zeros(1,1,hidden_size))
    
    def forward(self,seq):
        lstm_out, self.hidden = self.lstm(seq.view(len(seq),1,-1), self.hidden)
        pred = self.linear(lstm_out.view(len(seq),-1))
        return ...



In [None]:
# manual seed
torch.manual_seed(42)
model = LSTM()
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

epochs = 20
future = 40

#%%
for i in range(epochs):
    
    for seq, y_train in train_data:
        ...
        
    print(f"Epoch {i} Loss: {loss.item()}")
    
    preds = train_set[-window_size:].tolist()

    # Test de la reconstruction:
    for f in range(future):
        seq = torch.FloatTensor(preds[-window_size:])
        with torch.no_grad():
            model.hidden = (torch.zeros(1,1,model.hidden_size),
                           torch.zeros(1,1,model.hidden_size))
            preds.append(model(seq).item())
        
    loss = criterion(torch.tensor(preds[-window_size:]), y[760:])
    print(f"Performance on test range: {loss}")
    
    plt.figure(figsize=(12,4))
    plt.xlim(700,801)
    plt.grid(True)
    plt.plot(y.numpy(),color='#8000ff')
    plt.plot(range(760,800),preds[window_size:],color='#ff8000')
    plt.show()
