# Salvatore Michele Rago, Mattia Maffongelli: MusicGen_NN

In this Google Colab Notebook, we continue the study of the of the paper "***MusicGen, a simple and controllable model for music generation***", commented yet in the README file.

As previously explained, MusicGen is an **auto-regressive transformer-based decoder** designed to create a framework capable of generating music conditioned on both specific **textual input** and **chromagrams** extracted from a set of pre-existing melodies.

The model primarily relies on its transformer decoder structure, which we will delve into later. However, this decoder cannot do the right things without the correct inputs. Therefore, to achieve functionality, it was necessary to work on **other aspects**, such as **tokenizing input audio** files, **tokenizing input text** through a pre-trained encoder, and **extracting/processing the chromagram** of a specific melody. Once the code related to these components was designed and implemented, we proceeded to the actual development of the decoder. This involved constructing its structure and incorporating the **"delay pattern"** related to the **codebooks** and tokens passed as input.

# Audio tokenization - EnCodec

Let's start talking about the audio files tokenization.
The code of this part is implemented in the "*Encodec.py*" file, which constructs a new class: **AudioTokenizer**.

First, let's import the necessary libraries for the class:

In [None]:
from datasets import load_dataset, Audio
from transformers import EncodecModel, AutoProcessor
import soundfile as sf
import librosa

As mentioned earlier, for the construction of this class, we utilized a pre-trained model, namely **EnCodec**. To use it, two functions need to be called from two different classes: **EncodecModel** and **AutoProcessor**. These functions are crucial for generating audio tokens. As can be observed, we opted for a 24kHz version rather than 32kHz. Despite being somewhat **limited** in capturing audio details, given the computational resources at our disposal, we found it to be the **better choice**.

Subsequently, we included two main functions within the class: the first one is ***get_tokens_from_file***, which, using the read method from the soundfile library, returns both the **audio vector** and its **sampling rate** given the file path. Before feeding the audio to the processor, we apply necessary transformations, particularly concerning the **number of channels** (converting from multi-channel to **mono-channel** using the mean of all channels) and the sampling rate (adjusting it to that of the processor). Once this is done, we pass the audio to the processor and then to the model's encoder, allowing us to generate tokens, which are stored in the **"audio_codes"** attribute.

Finally, the second function, ***save_tokens_to_audio_file***, simply takes the input tokens, processes them in the model's **decoder**, and generates the **reconstructed audio** file, which is then saved.

In [None]:
class AudioTokenizer:
    def __init__(self):
        self.model = EncodecModel.from_pretrained("facebook/encodec_24khz")
        self.processor = AutoProcessor.from_pretrained("facebook/encodec_24khz")

    def get_tokens_from_file(self, file_path):
        # Load the audio:
        audio_sample, sample_rate = sf.read(file_path)

        # Convert from multi-channel to mono-channel with the mean:
        if len(audio_sample.shape) > 1:
            # Se l'audio è stereo o multicanale, calcola la media dei canali per ottenere un segnale mono
            audio_sample = audio_sample.mean(axis=1)

        # Resampling:
        if sample_rate != self.processor.sampling_rate:
            audio_sample = librosa.resample(audio_sample, orig_sr=sample_rate, target_sr=self.processor.sampling_rate)

        inputs = self.processor(raw_audio=audio_sample, sampling_rate=self.processor.sampling_rate, return_tensors="pt")

        encoder_outputs = self.model.encode(input_values=inputs["input_values"],
                                            padding_mask=inputs.get("attention_mask", None),
                                            bandwidth=3.0)

        # Take the tokens with the attribute audio_codes
        tokens = encoder_outputs.audio_codes
        return tokens[0][0]

    @staticmethod
    def perform_quantization(tokens):
        outmap_min, _ = torch.min(tokens, dim=0, keepdim=True)
        outmap_max, _ = torch.max(tokens, dim=0, keepdim=True)
        normalized_audio_tokens = (tokens - outmap_min) / (outmap_max - outmap_min)  # Broadcasting rules apply

        # Define the number of quantization levels (bins)
        num_bins = 1024
        return (normalized_audio_tokens * (num_bins - 1)).to(torch.int32)

    def save_tokens_to_audio_file(self, tokens, output_file_path):
        tokens = AudioTokenizer.perform_quantization(tokens)
        tokens = tokens.unsqueeze(dim=0).unsqueeze(dim=0)
        audio_values = self.model.decode(tokens, [None], None)[0]

        # Convert the tensor to obtain a correct file audio wav
        reconstructed_audio = audio_values.detach().numpy().flatten()
        reconstructed_audio = reconstructed_audio * (2 ** 15)  # Scale the audio

        # Save the audio
        sf.write(output_file_path, reconstructed_audio, self.processor.sampling_rate)


What has just been presented is nothing other than the main "ingredient" for constructing the transformer decoder!
Now let's talk about the **conditioning tensors**.

#FIRST CONDITIONING - TEXT

The first conditioning phase involves **textual inputs** corresponding to a textual description of the input audio. What we have implemented is the generation of a **conditioning tensor** C, given as input one or more texts. It has dimensions **T_C** = length of the input sequence **X D**, where D represents the **embedding size**, which in our case is 4 (token size).
All of these is represented by the class: **TextToTokenConverter**.

In the studied paper, three general approaches were mentioned for this part. The one we chose involves using the pre-trained **T5 model**, with the specific function of generating tokens from a string or text. In this case, the necessary libraries were:

In [None]:
import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration

As for the previous part, here too, we used both the actual model and a kind of processor, namely the **tokenizer**. Among the various options, we decided to use "***t5_base***", which comprises 220 million parameters, as we deemed it sufficient for our type of work. The other two versions available were: "*small*" with 60 million parameters, and "*large*" with 770 million parameters, a number that is actually disproportionate for our resources.

At this point, we pass our textual sequence to the tokenizer, which returns a sequence of token identifiers corresponding to the provided text in the form of a **torch tensor**. This tensor is then given as input to the model's encoder, all within the **no_grad condition**, as we don't want to compute and store gradients during the **backward pass** to speed up the execution (and since it's pre-trained). From this, we access the ***last_hidden_state*** containing the final hidden output of the model's encoder layer.

The final instruction that this function performs is to adjust the **dimensions** of the obtained conditioning tensor to the dimensions that we actually need as input for the transformer. To do this, we use a **linear layer** that, aware of the potential loss of information, returns a tensor with dimensions **(number of sequences, number of tokens, token size)**.

In [None]:
class TextToTokenConverter:
    def __init__(self):
        # Since we want the token's dimension to be 4, I use a linear function that reduce the dimension.
        self.linear_layer = torch.nn.Linear(768, 4)
        self.linear_layer.requires_grad_(False)

    def convert_text_to_tokens(self, text):
        tokenizer = T5Tokenizer.from_pretrained("t5-base")
        model = T5ForConditionalGeneration.from_pretrained("t5-base")

        # ENCODER PART WITH T5-MODEL
        input_ids = tokenizer(text, return_tensors="pt").input_ids

        with torch.no_grad():
            encoder_output = model.encoder(input_ids).last_hidden_state

        # Generate the conditioning tensor, having as dimension T_C * D
        # where T_C = length of the sequence and D = embedding size.
        conditioning_tensor = self.linear_layer(encoder_output)

        return conditioning_tensor[0]

Defined the method to obtain tokens from an input text, let's focus on the melody conditioning.

#SECOND CONDITIONING - MELODY

While conditioning from textual input is more common, the foundation of a song or music lies in its melody. Therefore, it should be possible to generate new music conditioned on an existing **melodic structure**. To achieve this, the paper proposes **joint conditioning** between text and melody, taking both the conditioning tensor related to the text and the chromagram of a melody as input. The chromagram is filtered through an **information bottleneck** concerning the dominant bin.

This was one of the more **challenging** parts for various reasons: on one hand, the paper didn't fully elaborate on how to precisely implement such conditioning, and on the other hand, we faced a **lack of data** to perform training. For these reasons, we decided to implement this part and to incorporate it into the final transformer, albeit without training the corresponding model. Therefore, the transformer functions without the ability to condition on a melody.

Nevertheless, it is useful and important to understand how we decided to implement the extraction of the chromagram from an audio file. First, the necessary libraries were:

In [None]:
import torch
import torchaudio
import numpy as np
import soundfile as sf
from librosa import filters

In this case as well, we created a class that encapsulates all the necessary methods: **MelodyToTokenConverter**. Inside, the two main methods are: ***convert_text_melody_to_tokens***, where, similar to audio tokenization, we open the audio file using the soundfile library given its path to obtain both the vector containing the audio and its sampling rate. We convert the audio, if necessary, to mono-channel and finally call the second method ***convert***. This method takes the audio vector, its sampling rate, and the embedding size as input. The first thing it does is create the **spectrogram** using a method from the torchaudio library, setting the number of samples for the Fourier transform (**n_fft**) to 128, and the **normalized** attribute to true, thus normalizing the result of the transform. Subsequently, a **chroma filter matrix** is created to be applied to the tensor containing the spectrogram using the **Einstein summation** operation. This operation allows the generation of the **"raw" chromagram** from the spectrogram, which is finally normalized as indicated in the paper, obtaining the actual chromagram of the melody.

In [None]:
class MelodyToTokenConverter:

    def convert(self, audio_time_series, sr, embedding_size, argmax=False):
        audio_time_series = audio_time_series.to(torch.float)
        nfft = 128
        spec = torchaudio.transforms.Spectrogram(n_fft=nfft, power=2, center=True,
                                                 pad=0, normalized=True)
        from librosa import filters
        fbanks = torch.from_numpy(filters.chroma(sr=sr, n_fft=nfft, n_chroma=embedding_size))

        spec = spec(torch.transpose(audio_time_series, 0, 1)).squeeze(1)
        raw_chroma = torch.einsum('cf,...ft->...ct', fbanks, spec)
        norm_chroma = torch.nn.functional.normalize(raw_chroma, dim=-2, eps=1e-6)
        from einops import rearrange
        norm_chroma = rearrange(norm_chroma, 'b d t -> b t d')

        if argmax:
            idx = norm_chroma.argmax(-1, keepdim=True)
            norm_chroma[:] = 0
            norm_chroma.scatter_(dim=-1, index=idx, value=1)

        return norm_chroma

    def convert_text_melody_to_tokens(self, audio_file_path, embedding_size):
        y, sr = sf.read(audio_file_path)
        if len(y.shape) > 1:  # convert to mono from stereo
            y = y.mean(axis=1).reshape(-1, 1)
        res = self.convert(torch.from_numpy(y), sr, embedding_size)
        return res[0]


Now that we have everything, we can finally talk about the transformer.

#TRANSFORMER

## The architecture

In [None]:
import sys
from math import sqrt

import torch
import torch.nn as nn

from T5_encoder import TextToTokenConverter


class MaskedSelfAttention(nn.Module):
    def __init__(self,
                 q_param,
                 v_param):
        """
        Builds a block that computes masked self-attention.
        :param q_param: The not-fixed dimension of the Query, Key matrices
        :param v_param: The not-fixed dimension of the Value matrix
        """
        super(MaskedSelfAttention, self).__init__()

        self.q_param = q_param
        self.v_param = v_param

        # The first dim of query, key and value Linear should be the embedding dimension.
        # Assuming for simplicity that it is equal to q and v

        # considering a X of dimension (n, d):
        self.query = nn.Linear(q_param, q_param)  # output dim (n ,q)
        self.key = nn.Linear(q_param, q_param)  # output dim (n ,q)
        self.value = nn.Linear(q_param, v_param)  # output dim (n ,v)

    def forward(self, query, key, value, mask=None):
        """
        Compute masked self-attention for the given tuple of Query, Key and Value
        :param query
        :param key
        :param value
        :param mask: The mask used in the masked-attention formula. If not given, a matrix of ones will be used.
        :return: The computed masked self-attention.
        """

        # Compute the values for query, key and value using learned params
        query = self.query(query)
        key = self.key(key)
        value = self.value(value)

        # Compute Q * K^T
        query_key = torch.matmul(query, key.t())

        # Add masking
        if mask is None:
            mask = torch.ones(query_key.size())
        masked = torch.mul(query_key, mask)

        # Compute the attention (with softmax along rows)
        h = torch.softmax(masked / sqrt(self.q_param), dim=1)
        return torch.matmul(h, value)


class CrossAttention(MaskedSelfAttention):
    """
    A block that computes cross-attention.
    """

    def forward(self, s1, s2):
        """
        Compute the cross-attention for the given input sequences
        :param s1: The sequence to use as query.
        :param s2: The sequence to use as key and value.
        :return: The cross-attention for the given input sequences.
        """
        return super().forward(s1, s2, s2)


class MaskedMultiHeadAttention(nn.Module):
    def __init__(self, h_param, q_param, v_param, *args, **kwargs):
        """
        Builds a block that computes masked self-attention a number of times equal to h_param.
        :param h_param: The number of times that masked self-attention should be performed.
        :param q_param: The not-fixed dimension of the Query, Key matrices
        :param v_param: The not-fixed dimension of the Value matrix
        """
        super(MaskedMultiHeadAttention, self).__init__()
        self.h_param = h_param
        self.q_param = q_param
        self.v_param = v_param

        self.attention_layers = [MaskedSelfAttention(q_param, v_param) for _ in range(h_param)]

        # Output linear layer (as indicated in Fig. 2 of "Attention is all you need")
        self.output_lin = nn.Linear(h_param * v_param, v_param)

    def forward(self, query, key, value, mask=None):
        # Compute the self-attention h times
        attentions = [self_att_layer(query, key, value, mask)
                      for self_att_layer in self.attention_layers]
        # Concatenate the result of each self-attention and then project them
        concatenation = torch.cat(attentions, dim=1)
        return self.output_lin(concatenation)


class MultiHeadCrossAttention(nn.Module):
    def __init__(self, h_param, q_param, v_param, *args, **kwargs):
        """
        Builds a block that computes masked self-attention a number of times equal to h_param.
        :param h_param: The number of times that masked self-attention should be performed.
        :param q_param: The not-fixed dimension of the Query, Key matrices
        :param v_param: The not-fixed dimension of the Value matrix
        """
        super(MultiHeadCrossAttention, self).__init__()
        self.h_param = h_param
        self.q_param = q_param
        self.v_param = v_param

        self.attention_layers = [CrossAttention(q_param, v_param) for _ in range(h_param)]

        # Output linear layer (as indicated in Fig. 2 of "Attention is all you need")
        self.output_lin = nn.Linear(h_param * v_param, v_param)

    def forward(self, s1, s2):
        # Compute the self-attention h times
        attentions = [cross_att_layer(s1, s2)
                      for cross_att_layer in self.attention_layers]
        # Concatenate the result of each self-attention and then project them
        concatenation = torch.cat(attentions, dim=1)
        return self.output_lin(concatenation)


class TransformerBlock(nn.Module):
    def __init__(self, q_val: int, v_val: int, h_val: int, dropout):
        """
        Builds a block of the encoder part of the transformer
        :param q_val: The q parameter of the Multi-Head-Attention block.
        :param v_val: The v parameter of the Multi-Head-Attention block.
        :param h_val: The v parameter of the Multi-Head-Attention block.
        :param dropout: The percentage of dropout to place after the normalization
        """
        super(TransformerBlock, self).__init__()

        self.attention = MaskedMultiHeadAttention(h_val, q_val, v_val)
        self.normalization_1 = nn.LayerNorm(v_val)

        # Define the FeedForward network as stated in section 3.3 of "Attention is all you need"
        self.feed_forward = nn.Sequential(
            nn.Linear(v_val, 4 * v_val),
            nn.ReLU(),
            nn.Linear(4 * v_val, v_val),
        )

        self.normalization_2 = nn.LayerNorm(v_val)
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value):
        # Compute the masked self-attention
        attention = self.attention(query, key, value)
        # Make layer normalization (with residual connection)
        first_normalization = self.dropout(self.normalization_1(attention + query))

        # Compute the feed forward
        feed_forward_out = self.feed_forward(first_normalization)
        # Make the second layer normalization (with residual connection)
        return self.dropout(self.normalization_2(feed_forward_out + first_normalization))


class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, q_val: int, v_val: int, h_val: int, dropout):
        """
        Builds the encoder part of a Transformer (a concatenation of TransformerBlocks)
        :param num_layers: The number of TransformerBlocks to include in the encoder.
        :param q_val: The q parameter of the Multi-Head-Attention block.
        :param v_val: The v parameter of the Multi-Head-Attention block.
        :param h_val: The h parameter of the Multi-Head-Attention block.
        :param dropout: The percentage of dropout to place after the normalizations.
        """
        super(TransformerEncoder, self).__init__()

        self.layers = nn.ModuleList([])
        for _ in range(num_layers):
            self.layers.append(TransformerBlock(q_val, v_val, h_val, dropout))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """

        :param x: Should have the following shape: (num_codebooks, f_r)
        :return:
        """
        # Note:
        # The input tokens are retrieved from the compression model, that have already imprinted the positional
        # encoding. For this reason they can be passed directly to the transformer.

        last_output = x
        for layer in self.layers:
            last_output = layer(last_output, last_output, last_output)

        return last_output


class DecoderBlock(nn.Module):
    def __init__(self,
                 q_val: int,
                 v_val: int,
                 h_val: int,
                 dropout: float, ):
        """
        :param q_val: The q parameter of the Multi-Head-Attention blocks.
        :param v_val: The v parameter of the Multi-Head-Attention blocks.
        :param h_val: The h parameter of the Multi-Head-Attention blocks.
        :param dropout: The percentage of dropout to place after the normalizations.
        """
        super(DecoderBlock, self).__init__()
        self.attention = MaskedSelfAttention(q_val, v_val)
        self.normalization = nn.LayerNorm(v_val)
        self.transformer_block = TransformerBlock(q_val, v_val, h_val, dropout)
        self.dropout = nn.Dropout(dropout)
        self.cross_attention = MultiHeadCrossAttention(h_val, q_val, v_val)

    def forward(self, x, value, key, src_mask, trg_mask, text_tokens=None):
        """
        :param x: The input of the block.
        :param value: The value matrix of the Self-Attention block, retrieved from the Transformer Encoder,
        to pass to a Self-Attention block in the decoder.
        :param key: The key matrix of the Self-Attention block, retrieved from the Transformer Encoder,
        to pass to a Self-Attention block in the decoder.
        :param src_mask: The mask passed to the encoder.
        :param trg_mask: The mask to use in the decoder.
        :param text_tokens: The embedded text to use for text-conditioning. Cross-validation will be computed
         between text_tokens and the output of the block. If not provided, text-conditioning will not be performed.
        :return:
        """
        attention = self.attention(x, x, x, trg_mask)
        normalization = self.normalization(attention + x)
        transformer_block_output = self.transformer_block(normalization, key, value)
        if text_tokens is None:
            return transformer_block_output
        else:
            return self.cross_attention(transformer_block_output, text_tokens)


class TransformerDecoder(nn.Module):
    def __init__(self, num_layers, q_val: int, v_val: int, h_val: int, dropout: float, embed_size: int,
                 trg_vocab_size: int):
        """
        Builds the decoder part of a Transformer (a concatenation of DecoderBlocks)
        :param num_layers: The number of DecoderBlocks to include in the decoder.
        :param q_val: The q parameter of the Multi-Head-Attention block.
        :param v_val: The v parameter of the Multi-Head-Attention block.
        :param h_val: The h parameter of the Multi-Head-Attention block.
        :param dropout: The percentage of dropout to place after the normalizations.
        :param embed_size: The size of the tokens passed to the Encoder.
        :param trg_vocab_size: The number of outputs of the decoder. This can also be intended as the number of
         output units of the decoder.
        """
        super(TransformerDecoder, self).__init__()
        self.layers = nn.ModuleList([DecoderBlock(q_val, v_val, h_val, dropout) for _ in range(num_layers)])

        self.full_conn_out = nn.Sequential(
            nn.Linear(embed_size, trg_vocab_size),
            nn.Softmax(dim=1),
        )

    def forward(self, x, encoder_output, src_mask, trg_mask, text_tokens=None):
        """

        :param x:
        :param encoder_output: The output of the Encoder Block
        :param src_mask: The src_mask argument of the DecoderBlock
        :param trg_mask: The trg_mask argument of the DecoderBlock
        :param text_tokens: The embedded text to use for text-conditioning. If not provided, text-conditioning
         will not be performed.
        """
        curr_output = x
        for dec_block in self.layers:
            curr_output = dec_block(x, encoder_output, encoder_output, src_mask, trg_mask, text_tokens)

        return self.full_conn_out(curr_output)


class Transformer(nn.Module):
    def __init__(self, num_layers, q_val: int, v_val: int, h_val: int, dropout: float, embed_size: int,
                 trg_vocab_size: int, src_pad_idx: int):
        """
        Builds a complete Transformer with an Encoder and a Decoder part
        :param num_layers: The number of layers to include in the encoder and in the decoder.
        :param q_val: The q parameter of the Multi-Head-Attention block.
        :param v_val: The v parameter of the Multi-Head-Attention block.
        :param h_val: The h parameter of the Multi-Head-Attention block.
        :param dropout: The percentage of dropout to place after the normalizations.
        :param embed_size: The size of the tokens passed to the Encoder.
        :param trg_vocab_size: The number of outputs of the decoder. This can also be intended as the number of
         output units of the decoder.


        """
        super(Transformer, self).__init__()

        self.src_pad_idx = src_pad_idx
        self.trg_vocab_size = trg_vocab_size

        self.encoder = TransformerEncoder(num_layers, q_val, v_val, h_val, dropout)
        self.decoder = TransformerDecoder(num_layers, q_val, v_val, h_val, dropout, embed_size, trg_vocab_size)

    @staticmethod
    def make_mask(dim):
        mask_ind = torch.tril(torch.ones((dim, dim), dtype=torch.bool), diagonal=-1).t()
        mask = torch.tril(torch.ones(dim, dim))
        mask[mask_ind] = sys.float_info.min

        return mask

    def forward(self, enc_input, dec_input):
        # Build masks for the encoder and for the decoder
        enc_mask = self.make_mask(enc_input.shape[0])
        dec_mask = self.make_mask(dec_input.shape[0])

        encoder_output = self.encoder(enc_input)
        return self.decoder(dec_input, encoder_output, enc_mask, dec_mask)


class TransformerWithText(Transformer):
    """
    A Transformer that accepts text conditioning
    """

    def forward(self, enc_input, dec_input, text_tokens):
        # Build masks for the encoder and for the decoder
        enc_mask = self.make_mask(enc_input.shape[0])
        dec_mask = self.make_mask(dec_input.shape[0])

        encoder_output = self.encoder(enc_input)
        return self.decoder(dec_input, encoder_output, enc_mask, dec_mask, text_tokens)


class TransformerWithTextAndMelody(TransformerWithText):
    """
    Builds a Transformer that accepts both text and melody conditioning
    """

    def forward(self, enc_input, dec_input, text_tokens, melody_chromagram):
        # Compute the conditioned input of the decoder
        dec_input_conditioned = torch.concat((melody_chromagram, dec_input), dim=0)

        # Build masks for the encoder and for the decoder
        enc_mask = self.make_mask(enc_input.shape[0])
        dec_mask = self.make_mask(dec_input_conditioned.shape[0])

        encoder_output = self.encoder(enc_input)
        return self.decoder(dec_input_conditioned, encoder_output, enc_mask, dec_mask, text_tokens)


The transformer used in this project follows the architecture defined in "Attention is all you need" by Vaswani et al., apart from slight modifications.
This is composed of an encoder and a decoder part with the same number of layers. Each layer of the encoder is a `TransformerBlock` (followed by a dropout layer), and consist in a Multi-Head Attention layer that is fed with the output of the embedding and positional encoding processes. The output of the attention layer (via a residual connection) is then normalized and passed to an MLP, where the hidden layer is 4 times the dimension of the input layer, that projects back to the initial dimension. The outcome of the output layer (via a residual connection) is, then, normalized and becomes the output of the `TransformerBlock`.
Each layer of the decoder is made up of a first part, where the input (after embedding and positional encoding) is passed to a Masked-Self-Attention (in order to make the model causal) and then normalized, and a second part where we compute the Cross-Attention between the output of that normalization and the output of the encoder. The layer continues with the `TransformerBlock` as defined above. The output of the decoder is then projected to the desired dimension and passed to a Softmax function. The result of the activation will be the result of the decoder.
The encoder and decoder are mixed up in the `Transformer` class.
<br><br>
This base structure is extended to support text and melody conditioning, as described by the reference paper.
`TranformerWithText` allows to pass some tokens as input to the transfomer along with a text conditioning. The given text tokens are used at the end of each layer of the decoder to compute Cross-Attention with the output of the `TransformerBlock`. This becomes the new output of the decoder layer.
While `TransformerWithTextAndMelody` allows to pass to the transformer audio tokens, some conditioning text and melody. The text tokens are used as for `TranformerWithText`, while the melody tokens (of the chromagram) are passed to the decoder, whose new input will be the concatenation of the melody and the audio tokens.

## The training proceess

In [None]:
import time

import torch
import torch.nn as nn
import torch.nn.functional as F

from data_preparation import load_dataset
from transformer import Transformer, TransformerWithText


class TransformerTrainer:
    def __init__(self, model: Transformer, model_save_dir):
        """

        :param model: The model to train
        :param model_save_dir: The directory where to save the trained model at the end of training.
        """
        self.model = model
        self.model_save_dir = model_save_dir

    def train_on_sample(self, encoder_input, decoder_input, num_epochs: int, learning_rate: float,
                        text_conditioning=None):
        """
        :param text_conditioning: The sequence of tokens to use for text conditioning. It is
        strictly related to the model passed to this class' constructor.
        :return:
        """

        # Define the loss function and the optimizer to use to train
        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)

        # Set the model in training mode
        self.model.train()

        for epoch in range(num_epochs):
            # Reset the gradients
            optimizer.zero_grad()
            # Calculate the output of the model
            output = self.model(encoder_input, decoder_input, text_conditioning)
            # Compute the current loss
            loss = loss_fn(output, encoder_input)
            # Compute the gradients
            loss.backward()
            # Perform an optimization step
            optimizer.step()
            print(f"Epoch: {epoch + 1}, Loss: {loss.item()}")

    def train_on_sample_and_melody(self, encoder_input, decoder_input, num_epochs: int, learning_rate: float,
                                   text_conditioning=None, melody_conditioning=None):
        """
        :param text_conditioning: The sequence of tokens to use for text conditioning. It is
        strictly related to the model passed to this class' constructor.
        :param melody_conditioning: The sequence of tokens to use for melody conditioning. It is
        strictly related to the model passed to this class' constructor.
        :return:
        """

        # Define the loss function and the optimizer to use to train
        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)

        # Set the model in training mode
        self.model.train()

        for epoch in range(num_epochs):
            # Reset the gradients
            optimizer.zero_grad()
            # Calculate the output of the model
            output = self.model(encoder_input, decoder_input, text_conditioning, melody_conditioning)
            # Compute the current loss
            loss = loss_fn(output, torch.cat((melody_conditioning, encoder_input), dim=0))
            # Compute the gradients
            loss.backward()
            # Perform an optimization step
            optimizer.step()
            print(f"Epoch: {epoch + 1}, Loss: {loss.item()}")

    def train_on_dataset(self, dataset, num_epochs: int, learning_rate: float):
        """
        Trains the model on the given dataset.
        :param dataset: As returned from data_preparation.load_dataset().
        """
        # Define the loss function and the optimizer to use to train
        loss_fn = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)

        # Set the model in training mode
        self.model.train()

        N = len(dataset)
        for epoch in range(num_epochs):
            # Reset the gradients
            optimizer.zero_grad()

            losses = torch.zeros(N)
            for i in range(N):
                # Get a sequence of tokens representing an audio file, and the associated text conditioning
                audio = dataset[i]['audio']  # n1 x d
                text = dataset[i]['text']  # n2 x d

                enc_input = audio
                dec_input = F.pad(input=enc_input[1:], pad=(0, 0, 1, 0), mode='constant', value=0)
                # Calculate the output of the model
                output = self.model(enc_input, dec_input, text)

                # Compute and store the loss for the current sequence
                curr_loss = loss_fn(output, enc_input).reshape(1)
                losses[i] = curr_loss

            # Compute the sum of the losses for all the samples
            loss = torch.sum(losses)
            # Compute the gradients
            loss.backward()
            # Perform an optimization step
            optimizer.step()
            print(f"Epoch: {epoch + 1}, Loss: {loss.item()}")

        # Save the model
        model_name = {round(time.time() * 1000)}
        torch.save(self.model.state_dict(), f'{self.model_save_dir}/{model_name}')
        torch.save(self.model, f'{self.model_save_dir}/{model_name}-2')

The `TranformerTrainer` class is meant to train the model passed to the constructor, as long as this is a subclass of the `Transformer` defined in the previous section. It contains 2 main methods: `train_on_sample` (with the variant to allow melody conditioning) and `train_on_dataset`. As suggested by the name, the former trains the model for a certain number of epochs just on a single sample, and is meant just to show that the model is correctly working during the exam. While the latter trains the model on the whole given dataset, and is used to perform the real optimization of the cost function / of weights.
The function used as loss is the CrossEntropy and the optimizer is Adam. After setting the model in training mode, at each iteration we reset the gradient and we pass all the sample sequences (one at a time) to the model. The whole sequence is passed to the encoder, while the shifted version is passed to the decoder, using a token of 0s as padding. Then, the loss between the reconstruction and the original input is computed for the current sequence. The loss of the current iteration will be the sum of all the losses. This will be used to compute the gradient, and the gradient will be used to update the weights (based on the learning rate).
At the end of training we save the model at the given path (just for evaluation purposes).

##The generation process

In [None]:
import torch

from Encodec import AudioTokenizer
from data_preparation import load_dataset
from transformer import TransformerWithText


class AudioGenerator:
    def __init__(self, model_path: str):
        self.model = TransformerWithText(num_layers=5,
                                         q_val=4,
                                         v_val=4,
                                         h_val=3,
                                         dropout=0.1,
                                         embed_size=4,  # 4,
                                         trg_vocab_size=4,
                                         src_pad_idx=0, )
        self.model.load_state_dict(torch.load(model_path))
        self.model.eval()  # set layers to evaluation mode

        self.audio_tokenizer = AudioTokenizer()

    def generate_audio(self, text, num_tokens: int = 1000, file_path=None):
        """
        Returns the audio returned by the model when it is fed with random noise and the given text.
        It returns the generated tokens and saves them as audio file based on whether file_path is None.
        :param text: The description of the audio to generate
        :param num_tokens: The number of tokens to generate.
        :param file_path: The .wav file where to save the generated audio.
        :return: the generated tokens and saves them as audio file based on whether file_path is None.
        """
        rnd_noise1 = torch.rand((1, 4))
        rnd_noise2 = torch.rand((1, 4))
        mask = self.model.make_mask(1)

        output = torch.empty((num_tokens, 4))
        for i in range(num_tokens):
            last_output = self.model.decoder(
                x=output[i - 1:i] if i >= 1 else rnd_noise1,  # last generated token or random noise
                encoder_output=output[i - 1:i] if i >= 1 else rnd_noise2,
                src_mask=mask,
                trg_mask=mask,
                text_tokens=text,
            )
            output[i] = last_output

        if file_path is not None:
            self.audio_tokenizer.save_tokens_to_audio_file(output.t(), file_path)


The `AudioGenerator` class is meant to generate some audio from a text conditioning, using the pre-trained model provided in the constructor.
The generation is performed using just the decoder and in an auto-regressive way: we start passing to the decoder two diffent tokens of random noise (along with text conditioning, the first one as input and the second as if it was the encoder output) to begin the generation process. Then, we continue passing the same text tokens, but we use the previously generated audio token as input, so that the new generation will be dependent on the description of the desired result and on the current partial generation. We store all the generated tokens and we either return them as returned from the decoder or we convert them to an audio .wav file.
In order to be able to convert the generated tokens to audio we first have to turn vectors of float values to vectors of integers (as needed for the Encodec decoder). For this scope we first scale the values between 0 and 1, and then perform quantization to a predefined number of bins. The tokens are, then, ready to be converted to a .wav. So, the pre-trained model has been used to generate some audio related on the given textual description.

# Example of usage

NOTE: In the following example I will refer to the `load_dataset()` function, that is contained in the `data_preparation.py` file, which will not be posted in this notebook as relies on the presence of the dataset in the local disk, but can be found in the repository of our project.

In the following code we are going to train the model described in the previous section. We will use the Musiccaps dataset but, since training a transformer on that amount of data would be unfeasible for our resources, we will use just a little slice composed of 32 samples.
Using the `load_dataset()` function we load in memory the audio and caption associated with each sample, already converted to tokens using the `TextToTokenConverter` and the `AudioTokenizer` classes.
Then, we initialize the Transformer with the choice of the hyperparameters that seems to lead to best results. Next, we initialize the trainer with the just created model and the path of the directory where we want to save the model after training.
Using the `train_on_dataset()` method of the trainer we train the model for the desired number of epochs. To achieve good results we would need to train the model on a larger dataset and for a very big amount of time (~ a high number of epochs) but, for the same reason discussed above and regarding the available resources, this will not be possible to us. So we will just train for a small amount of epochs.
The learning rate provided to the optimizer is $10^{-1}$, which is very big with respect to the commonly used learning rates. This choice is led by the fact that the starting loss of ~96k suggests us that we are very far from the optimal point of the cost function. In order to move faster towards it, we choose a high learning rate. Using the Adam optimizer it will then be adapted properly. This will cause us many problem if we get close to the optimal point, as we would not be able to stabilize on it but we would start oscillating on a certain interval centered on that point. However, as we can see from the training output, with a low number of epochs we stay very far from the optimal point and our choice of the learning rate successfully allows us to move as fast as possible towards the right direction, without ever missing the desired point and starting to get far from it.

In [None]:
dataset = load_dataset()
model = TransformerWithText(
    num_layers=5,
    q_val=4,
    v_val=4,
    h_val=3,
    dropout=0.1,
    embed_size=4,  # 4,
    trg_vocab_size=4,
    src_pad_idx=0,
)
trainer = TransformerTrainer(model, 'path-to-the-trained-model')
trainer.train_on_dataset(dataset, num_epochs=2000, learning_rate=1e-1)

# Evaluation

The orginal MusicGen model has been trained on a dataset containing **26.000 hours** of music, and **96 GPUs** allowed to find the optimal values for **3.3 billion weights** in a time in the magnitude order of **days** (as suggested by the "Attention is all you need" paper). So, this model is able to generate audio related to the provided description.
However, as expected from the **Example of usage** section, with a low number of epochs and a small dataset, we are not able to train the model so that it can generate meaningful audio.
Therefore, even if in the paper the CLAP score, KL divergence and Fréchet Audio Distance are used to evaluate the model performance, along with subjective methods, we believe that conducting this kind of computations would not lead to results that are much different from listening to the generated audio to understant that it contains approximately random noise.