In [0]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [0]:
# https://graviraja.github.io/seqtoseqimp/#
import os

import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from torch import nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from tqdm import trange
import random


In [0]:
os.chdir(os.path.join("drive", "My Drive", "INF8225", "Projet"))

In [0]:
path_base = '/content/drive/My Drive/INF8225/Projet/' #à modifier selon l'emplacement des données

In [0]:
os.getcwd()

In [0]:
def data(batch_size):
    """
    DATA from make_npy_padded
    Load data when input is : sequences of notes grouped by measures
    target: sequence of chords grouped by measures
    notes : padded to 32/ measure

    Process data for torch
    For crossentropy, target must be a number (from 0 to number of classes)

    :param batch_size:
    :return: train , validation and test dataloader
    """

    train_inputs = np.load(path_base + 'data/train_input_1measures.npy', allow_pickle=True)
    train_targets = np.load(path_base+ 'data/train_target_1measures.npy', allow_pickle=True)
    test_inputs = np.load(path_base+ 'data/test_input_1measures.npy', allow_pickle=True)
    test_targets = np.load(path_base+ 'data/test_target_1measures.npy', allow_pickle=True)


    train_inputs = np.concatenate([np.array(train_inputs[i]) for i in range(train_inputs.shape[0])], axis=0)
    train_targets = np.concatenate([np.array(train_targets[i]) for i in range(train_targets.shape[0])], axis=0)
    test_inputs = np.concatenate([np.array(test_inputs[i]) for i in range(test_inputs.shape[0])], axis=0)
    test_targets = np.concatenate([np.array(test_targets[i]) for i in range(test_targets.shape[0])], axis=0)


    train_inputs = addStartToken(train_inputs)
    test_inputs = addStartToken(test_inputs)

    train_inputs, validation_inputs, train_targets, validation_targets = train_test_split(train_inputs, train_targets,
                                                                                          random_state=2018,
                                                                                          test_size=0.1,
                                                                                          shuffle=True)

    # Convert all of our data into torch tensors, the required datatype for our modele
    train_inputs = torch.tensor(train_inputs, dtype=torch.float)  # float
    validation_inputs = torch.tensor(validation_inputs, dtype=torch.float)
    train_targets = torch.tensor(train_targets, dtype=torch.float)  # always float!!!
    validation_targets = torch.tensor(validation_targets, dtype=torch.float)
    test_inputs = torch.tensor(test_inputs, dtype=torch.float)
    test_targets = torch.tensor(test_targets, dtype=torch.float)

    # Create an iterator of our data with torch DataLoader for memory efficency.
    train_data = TensorDataset(train_inputs, train_inputs) #the target is the input for an autoencoder
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

    validation_data = TensorDataset(validation_inputs, validation_inputs) #the target is the input for an autoencoder
    validation_sampler = SequentialSampler(validation_data)
    validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

    test_data = TensorDataset(test_inputs, test_inputs) #the target is the input for an autoencoder
    test_sampler = SequentialSampler(test_data)
    test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
    return train_dataloader, validation_dataloader, test_dataloader

In [0]:
def addStartToken(targets, initial_dim=12):
  #we add a dimension to code the start token. A note will be a one hot of 13 classes,
  # the last one for the start token. We add a start token at the start of each measure.
  a,b,c = np.shape(targets)
  start_token = [0 for i in range(initial_dim)] + [1]
  start_token_array = np.array([[start_token]for i in range(a)])
  dim_supp = np.array([[[0] for i in range(b)] for j in range(a)])
  targets_dim_supp = np.concatenate((targets,dim_supp), axis=2)
  targets_with_start = np.concatenate((start_token_array, targets_dim_supp), axis=1)
  return targets_with_start

In [0]:
def accuracySansZeros(outputs, targets, targets_maxvalue):
    """
    :param outputs: (batch_size, 32, 13) from model where 13 sized vector is probas (logits) for one note, the 13th element is for the start token
    :param targets: (batch_size) , each chord is a number meaning its class
    :param targets_maxvalue: (batch_size), used to know if the target vector was a padded vector of zeros
    :return: accuracy for one batch, where padded vector of zeros are not inculuded
    """
    tot_prediction = targets.shape[0]
    # log croissant donc log:softmax => valeur désirée est la plus grande (plus grande proba)
    idx_predicted = np.argmax(outputs, 1)
    values_to_ignore = (targets_maxvalue == 0).sum()
    return ((idx_predicted == targets).sum() - values_to_ignore) / (tot_prediction - values_to_ignore)

In [0]:
class Encodeur(nn.Module):
    """ Sequence to sequence networks consists of Encoder and Decoder modules.
    This class contains the implementation of Encoder module.
    Args:
        input_dim: A integer indicating the size of input dimension.
        hidden_dim: A integer indicating the hidden dimension of RNN layers.
        n_layers: A integer indicating the number of layers.
        dropout: A float indicating dropout.

    if bidirectionnal, we need to manually concatenate the final states and cells for both directions
    => for each layer, SUM hidden state on backward and forward direction (and cell state)
    """

    def __init__(self, input_dim=32*13, hidden_dim=256, output_dim = 50, n_layers=2, dropout=0.3, bidirectional=True):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.dropout = dropout
        self.bidirectional = bidirectional

        self.lstm = nn.LSTM(input_size=input_dim,
                            hidden_size=hidden_dim,
                            num_layers=n_layers,
                            dropout=dropout,
                            bidirectional=bidirectional,
                            # ini decodeur avec toujours le mm last cell state and hiddenstate?
                            batch_first=True
                            )
        self.linear = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        """
        return for last layer of lstm (on the top):
            - hidden state for all its cells (seq_len hidden states)
            - hidden cell state for all its cells
        We will need only the h_s et c_s of the last cell of the last layer, for decoder initialization


        hidden state if bidirectional: [forward_1, backward_1, forward_2, backward_2, ...]
        = for each layer for each direction the last hidden state
        => we need to sum / match dimension (two context vector for each layer, decoder need only one) for bw/fw

        :param x: [batch_size, sentence_length, input_size]
        :return:
        """
        #we flatten the measure matrix in order to have a big vector to encode
        x = x.view(x.shape[0],1,x.shape[1]*x.shape[2])

        outputs, (hidden, cell) = self.lstm(x)  # last hidden /cell (for each layer, and for each direction)
        
        outputs = self.linear(outputs.squeeze(1))
        outputs = F.log_softmax(outputs)
        
        if self.bidirectional:
            # Sum backward and forward (add first dim as 0 with squeeze for concatenation of the two layers)
            hid_layer1 = (hidden[0, :, :] + hidden[1, :, :]).unsqueeze(0)
            hid_layer2 = (hidden[2, :, :] + hidden[3, :, ]).unsqueeze(0)
            hidden = torch.cat((hid_layer1, hid_layer2), dim=0)

            cell_layer1 = (cell[0, :, :] + cell[1, :, :]).unsqueeze(0)
            cell_layer2 = (cell[2, :, :] + cell[3, :, ]).unsqueeze(0)
            cell = torch.cat((cell_layer1, cell_layer2), dim=0)

        return outputs, hidden, cell

In [0]:
class Decoder(nn.Module):
    """ This class contains the implementation of Decoder Module.
    Args:
        output_dim: A integer indicating the size of output dimension.
        hidden_dim: A integer indicating the hidden size of rnn.
        n_layers: A integer indicating the number of layers in rnn.
        dropout: A float indicating the dropout.
        input_dim: input dimension for the decodeur (chord one hot len)

    """

    def __init__(self, input_dim=13, output_dim=13, hidden_dim=256, n_layers=2, dropout=0.3):
        super().__init__()

        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.lstm = nn.LSTM(input_size=input_dim,
                           hidden_size=hidden_dim,
                           num_layers=n_layers,
                           dropout=dropout,
                           batch_first=True, 
                           )
        self.linear = nn.Linear(hidden_dim, output_dim)

    def forward(self, input, hidden, cell):
        # input is of shape [batch_size, note_len]
        # = the i-th (note of the sequence of target/previous decoder output) for each sample
        # hidden is of shape [batch_size, n_layer * num_directions, hidden_size]
        # cell is of shape [batch_size, n_layer * num_directions, hidden_size]


        input = input.unsqueeze(1)  # unsqueeze() inserts singleton dim at position given as parameter
        # input shape is [batch_size, output_dim]. reshape is needed rnn expects a rank 3 tensors as input.
        # so reshaping to [batch_size, 1, output_dim] means a batch of batch_size each containing 1 index.


        output, (hidden, cell) = self.lstm(input, (hidden, cell))

        predicted = self.linear(output.squeeze(1))  # linear expects as rank 2 tensor as input, remove
        # predicted shape is [batch_size, output_dim]
        predicted = F.log_softmax(predicted)  # one hot prediction

        return predicted, hidden, cell

In [0]:
class Seq2Seq(nn.Module):  # TODO add attention
    """ This class contains the implementation of complete autoencoder network.
    It uses to encoder to produce the context vectors.
    It uses the decoder to produce the predicted target sentence.
    Args:
        encoder: A Encoder class instance.
        decoder: A Decoder class instance.

    Decoder : input should begin with <start> and target ends with <end> (otherwise learns identity)
    """

    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, inputs, targets, teacher_forcing_ratio=0.3):
        # we don't have embedding layer in decoder/encoder
        #   - inputs (sequence of notes) is size [batch_size, seq_len, note_len]. Others size diff also
        #   - targets is of shape [batch_size, sequence_len, note_len]
        # if teacher_forcing_ratio is 0.3 we use ground-truth inputs 30% of time and 70% time we use decoder outputs.

        batch_size = targets.shape[0]
        max_len = targets.shape[1]  # len of the sequence of notes outputed by decoder       
        target_size = targets.shape[2]  # = self.decoder.output_dim

        # to store the outputs of the decoder
        outputs = torch.zeros(batch_size, max_len, target_size)

        # context vector, last hidden and cell state of encoder to initialize the decoder (from each layer)
        _, hidden, cell = self.encoder(inputs[:,1:,:]) #without the start token

        # first input to the decoder is the <sos> tokens # the first note of the seq for each sample of the batch
        input = targets[:, 0] # size [batch_size, chord_length]
        for t in range(1, max_len):
            # pass the input, previous hidden and previous cell states into the decoder (forward)
            # receive a prediction, next hidden state and next cell state from the decoder
            output, hidden, cell = self.decoder(input, hidden, cell)
            # place our prediction / output in our tensor of predictions (output= [batch_size, note_size])
            outputs[:, t, :] = output
            # decide if we are going to “teacher force” or not
            use_teacher_force = random.random() < teacher_forcing_ratio
            # target: batch_size*note_size => 1 note target for each sample
            input = (targets[:, t, :] if use_teacher_force else output)

              # outputs is of shape [batch_size, sequence_len, output_dim]
        
        return outputs

In [0]:
def train(model, batch_size=1024, learning_rate=0.001, eps=1e-8, epochs=10, patience=15, model_name="Normal"):
    """
    target: must be a single number, not a one hot... for using cross_entropy
    :param patience: break if no improvment of validation accuracy after waiting 10 epochs
    :return:
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    best_loss = 100
    best_model = None

    train_dataloader, validation_dataloader, _ = data(batch_size)

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, eps=eps)
    # PyTorch scheduler: reduce lr by factor after patience with no improvment of val_loss (mieux que StepLR)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=False, threshold=0.0001,
                                  threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)
    criterion = nn.NLLLoss()  # si nn.logsoftmax dans le modele (pour le test)

    max_grad_norm = 2.0  # Gradient threshold, gradients norms that exceed this threshold are scaled down to match
    # the norm.

    # Tracking values
    val_accuracies = list()
    val_losses = list()
    train_accuracies = list()
    train_losses = list()

    p = 0  # patience

    model.cuda()
    for epoch in trange(epochs, desc="Epoch"):
        if p >= patience:
            break

        model.zero_grad()  # Clear stored gradient
        model.train()  # training mode

        # Tracking variables
        tr_loss, tr_acc, nb_tr_steps = 0, 0, 0

        # Train on all our data for one epoch
        for step, batch in enumerate(train_dataloader):
            # Add batch to GPU
            b_inputs, b_targets = tuple(t.to(device) for t in batch)


            # Forward pass
            # output is of shape [batch_size, sequence_len, output_dim] = 1024*4*24
            outputs = model(b_inputs, b_targets)

            outputs = outputs[:, 1:, :]
            b_targets = b_targets[:, 1:, :]  # ignore start token

            # loss function (negative log likelihood because of the last log softmax layer) = multiclass cross-entropy
            # "size_average" is set to "False", the losses are summed for each minibatch.
            # average loss for all predicted chords of the seq
            # NLL loss need index and not onehot. Take argmax of one hot chord (axe = 2)
            b_targets_maxvalues, b_targets = b_targets.max(2)  # max return (values of max, indices)
            b_targets_maxvalues = b_targets_maxvalues.view(-1).to(device)
            b_targets = b_targets.view(-1).to(device)
            outputs = outputs.reshape(outputs.shape[0]*outputs.shape[1], outputs.shape[2]).to(device)
            loss = criterion(outputs, b_targets)  # -1: torch chose. view: reshape tensor
            # Backward pass
            loss.backward()

            # Update tracking variables
            tr_loss += loss.item()

            # Move logits and labels to CPU
            outputs = outputs.detach().cpu().numpy()
            b_targets = b_targets.to('cpu').numpy()
            tr_acc += accuracySansZeros(outputs, b_targets, b_targets_maxvalues)

            # AdamW optimizer: update parameters and take a step using the computed gradient
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            optimizer.step()  # Perform a step of gradient descent
            model.zero_grad()  # Reset gradient

            nb_tr_steps += 1

        train_accuracy = tr_acc / nb_tr_steps
        train_loss = tr_loss / nb_tr_steps
        train_accuracies.append(train_accuracy)
        train_losses.append(train_loss)


        # 2. Validation step
        # Put model in evaluation mode to evaluate loss on the validation set
        model.eval()

        # Tracking variables
        nb_eval_steps, v_acc, v_loss = 0, 0, 0

        # Evaluate data for one epoch
        for batch in validation_dataloader:
            # Add batch to GPU, Unpack the inputs from our dataloader
            b_inputs, b_targets = tuple(t.to(device) for t in batch)
            # Telling the model not to compute or store gradients, saving memory and speeding up validation
            with torch.no_grad():
                # Forward pass, calculate logit predictions
                # turn off the teacher forcing
                outputs = model(b_inputs, b_targets, 0)

            outputs = outputs[:, 1:, :]
            b_targets = b_targets[:, 1:, :]  # ignore start token

            # # Change shape for computing mean accuracy over sequence / mean loss
            b_targets_maxvalues, b_targets = b_targets.max(2)  # max return (values of max, indices)
            b_targets_maxvalues = b_targets_maxvalues.view(-1).to(device)
            b_targets = b_targets.view(-1).to(device)
            outputs = outputs.reshape(outputs.shape[0]*outputs.shape[1], outputs.shape[2]).to(device)

            v_loss += criterion(outputs, b_targets).item()

            # Move logits and labels to CPU
            outputs = outputs.detach().cpu().numpy()
            b_targets = b_targets.to('cpu').numpy()
            v_acc += accuracySansZeros(outputs, b_targets,b_targets_maxvalues)

            nb_eval_steps += 1
        val_accuracy = v_acc / nb_eval_steps
        val_loss = v_loss/nb_eval_steps
        val_accuracies.append(val_accuracy)
        val_losses.append(val_loss)

        # avoid overfitting
        if val_loss < best_loss:
            p = 0
            best_model = model
            best_encoder = model.encoder
            best_loss = val_loss
            torch.save(best_model.state_dict(), path_base+ 'model_torch/Autoencoder' + model_name)
            torch.save(best_encoder.state_dict(), path_base+ 'model_torch/Encoder' + model_name)
        else:
            p += 1

        print("Train loss: {} - Train accuracy: {} -  Val loss: {} - Val accuracy: {}".format(train_loss, train_accuracy
                                                                                              , val_loss, val_accuracy))

        scheduler.step(val_loss)

    return best_model, train_losses, train_accuracies, val_accuracies, val_losses


In [0]:
def test(model, batch_size=256):  # TODO sortir ouput pour une chanson avec index chanson
    """
    Test model on test songs
    :param mode: type of vector/feature to keep
    :param batch_size:
    :param model:
    :return:
    """
    _, _, test_dataloader = data(batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    test_acc = 0
    nb_tr_steps = 0
    with torch.no_grad():
        for step, batch in enumerate(test_dataloader):
            # Add batch to GPU
            b_inputs, b_targets = tuple(t.to(device) for t in batch)
            # No teacher forcing ############# but target[0] is used as first input decoder !! should be <start>
            
            outputs = model(b_inputs, b_targets, 0)


            outputs = outputs[:, 1:, :]
            b_targets = b_targets[:, 1:, :]  # ignore start token

            # # # Change shape for computing mean accuracy over sequence
            b_targets_maxvalues, b_targets = b_targets.max(2)  # max return (values of max, indices)
            b_targets_maxvalues = b_targets_maxvalues.view(-1).to(device)
            b_targets = b_targets.view(-1).to(device)
            outputs = outputs.reshape(outputs.shape[0]*outputs.shape[1], outputs.shape[2]).to(device)

            outputs = outputs.detach().cpu().numpy()
            b_targets = b_targets.to('cpu').numpy()

            test_acc += accuracySansZeros(outputs, b_targets, b_targets_maxvalues)
            nb_tr_steps += 1
        test_accuracy = test_acc / nb_tr_steps
    print("Test accuracy: {}".format(test_accuracy))
    pass

In [0]:
def show_result(train_losses, train_accuracies, val_accuracies, val_losses):
    plt.plot(np.arange(len(val_accuracies)), train_losses, label='train loss')
    plt.plot(np.arange(len(val_accuracies)), val_losses, label='val loss')
    plt.legend(loc="upper right")
    plt.title("Negative log-likelihood val-train")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.savefig(path_base+ 'loss1.png')
    plt.show()

    plt.plot(np.arange(len(val_accuracies)), train_accuracies, label='train accuracy')
    plt.plot(np.arange(len(val_accuracies)), val_accuracies, label='val accuracy')
    plt.legend()
    plt.title("Accuracy over validation/train-set")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.savefig(path_base+ 'accuracy1.png')
    plt.show()
    pass

In [0]:
  #train
  encoder = Encodeur(input_dim=32*13, bidirectional=False)
  decoder = Decoder()
  model = Seq2Seq(encoder, decoder)
  best_model, train_losses, train_accuracies, val_accuracies, val_losses = train(model, model_name=".pth", batch_size=256, learning_rate=0.001, epochs = 100)
  print("DONE")

In [0]:
  #test
  encoder = Encodeur(input_dim=32*13, bidirectional=False)
  decoder = Decoder()
  model_trained = Seq2Seq(encoder, decoder)
  model_trained.load_state_dict(torch.
  load("/content/drive/My Drive/INF8225/Projet/model_torch/Autoencoder.pth"))
  test(model_trained, batch_size=128)
  

In [0]:
#plot results
show_result(train_losses, train_accuracies, val_accuracies, val_losses)

In [0]:
def encode_dataset(encoder, inputs_by_songs, targets_by_songs, random_sampler=False, loader_batch_size=1024, nb_measure=4):
  """
    :param encoder: the trained encoder which will encode our inputs
    :param inputs_by_songs: inputs grouped by songs in order not to group 4 measures from two different songs
    :param targets_by_songs: targets_grouped_by_songs
    :praram random_sampler: boolean deciding if the sampler to use to load build the dataloader is random (for the training set) or sequential(for the validation and test sets)
    :param loader_batch_size
    :param nb_measure: the size of the grouped measures after encoding the input in order to feed the Seq2Seq model
    :return: a dataloader the can be loaded by the Seq2Seq model
    """
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  encoder.cuda()
  grouped_measures = list()
  grouped_chords = list()
  for i in range(inputs_by_songs.shape[0]):
    inputs = addStartToken(inputs_by_songs[i])
    inputs = torch.tensor(inputs, dtype=torch.float)
    targets = torch.tensor(targets_by_songs[i], dtype=torch.float)
    targets = targets.view(targets.shape[0], targets.shape[2])
    data = TensorDataset(inputs, targets)
    sampler = SequentialSampler(data)
    dataloader = DataLoader(data, sampler=sampler, batch_size=inputs.shape[0])
    with torch.no_grad():
      for step, batch in enumerate(dataloader):
        b_inputs, b_targets = tuple(t.to(device) for t in batch)
        encoded_song, _, _ = encoder(b_inputs[:,1:,:])
        grouped_measures = grouped_measures + [encoded_song[x:x + nb_measure] for x in range(0,encoded_song.shape[0]-nb_measure)]
        grouped_chords = grouped_chords + [b_targets[x:x+nb_measure] for x in range(0,b_targets.shape[0]-nb_measure)]
  encoded_inputs = torch.stack(grouped_measures)
  targets = torch.stack(grouped_chords)
  data = TensorDataset(encoded_inputs, targets)
  if random_sampler:
    sampler = RandomSampler(data)
  else:
    sampler = SequentialSampler(data)
  dataloader = DataLoader(data, sampler=sampler, batch_size=loader_batch_size)
  return dataloader

In [0]:
#Encoding inputs in order to save them and use them in our Seq2Seq architecture. Unfortunately, we weren't able to make our model work with these encoded inputs

encoder = Encodeur(input_dim=32*13, bidirectional=False)
encoder.load_state_dict(torch.
  load(path_base+ 'model_torch/Encoder.pth'))


#data comes from make_npy_groups_measure, with n_measures=1. Measures are grouped by songs

train_inputs_by_songs = np.load(path_base+ 'data/train_input_1measures.npy', allow_pickle=True)
train_targets_by_songs = np.load(path_base+ 'data/train_target_1measures.npy', allow_pickle=True)
test_inputs_by_songs = np.load(path_base+ 'data/test_input_1measures.npy', allow_pickle=True)
test_targets_by_songs = np.load(path_base+ 'data/test_target_1measures.npy', allow_pickle=True)

train_inputs_by_songs, validation_inputs_by_songs, train_targets_by_songs, validation_targets_by_songs = train_test_split(train_inputs_by_songs, train_targets_by_songs,
                                                                                                                          random_state=2018,
                                                                                                                          test_size=0.1,
                                                                                                                          shuffle=True)

encoded_train_dataloader = encode_dataset(encoder,train_inputs_by_songs, train_targets_by_songs, random_sampler=True)
encoded_validation_dataloader = encode_dataset(encoder,validation_inputs_by_songs, validation_targets_by_songs)
encoded_test_dataloader = encode_dataset(encoder,test_inputs_by_songs, test_targets_by_songs, loader_batch_size=256)

torch.save(encoded_train_dataloader, path_base+ 'data/encoded_train_dataloader.pth')
torch.save(encoded_validation_dataloader, path_base+ 'data/encoded_validation_dataloader.pth')
torch.save(encoded_test_dataloader, path_base+ 'data/encoded_test_dataloader.pth')