In [36]:
hparams = {
    "d_model": 128,
    "num_layers": 3,
    "num_heads": 8,
    "d_ff": 512,
    "max_rel_dist": 1024,
    "max_abs_position": 0,
    "vocab_size": 471,
    "bias": True,
    "dropout": 0.1,
    "layernorm_eps": 1e-6
}

import torch
import torch.nn.functional as F
from torch import nn
from math import sqrt

def abs_positional_encoding(max_position, d_model, n=3):
    """
    Since the transformer does not use recurrence or convolution, we have to deliberately give it positional
    information. Though learned relative position embeddings will be added to the model, it is possible that absolute
    position encoding will aid it in predicting next tokens.

    Args:
        max_position (int): maximum position for which to calculate positional encoding
        d_model (int): Transformer hidden dimension size
        n (int): number of dimensions to which to broadcast output

    Returns:
        sinusoidal absolute positional encoding of shape d_model for max_position positions
    """
    # set of all positions to consider
    positions = torch.arange(max_position).float().to(device)

    # get angles to input to sinusoid functions
    k = torch.arange(d_model).float().to(device)
    coeffs = 1 / torch.pow(10000, 2 * (k // 2) / d_model)
    angles = positions.view(-1, 1) @ coeffs.view(1, -1)

    # apply sin to the even indices of angles along the last axis
    angles[:, 0::2] = torch.sin(angles[:, 0::2])

    # apply cos to the odd indices of angles along the last axis
    angles[:, 1::2] = torch.cos(angles[:, 1::2])

    return angles.view(*[1 for _ in range(n-2)], max_position, d_model)


def skew(t):
    """
    Implements Huang et. al, 2018's skewing algorithm to correctly reorder the dot(Q, RelativePositionEmbeddings)
    matrix. This function generalizes to any shape and any number of dimensions. However, attention calculation
    requires shape (..., L, L).

    Algorithm:
        1. Pad T
        2. Reshape
        3. Slice

    Args:
        t (torch.Tensor): tensor to skew

    Returns:
        Srel: skewed t: nth column from the right is skewed into the nth diagonal under the main; same shape as t
    """
    # pad T
    padded = F.pad(t, [1, 0])

    # reshape to diagonalize the columns in the last 2 dimensions
    Srel = padded.reshape(-1, t.shape[-1] + 1, t.shape[-2])

    # final touches
    Srel = Srel[:, 1:]              # slice last L values
    Srel = Srel.reshape(*t.shape)   # reshape to shape of t
    return Srel


def rel_scaled_dot_prod_attention(q, k, v, e=None, mask=None):
    """
    A modification given by Shaw et. al, 2018, improved by Huang et. al, 2018, to the Scaled Dot-Product Attention
    mechanism given in Vaswani et. al, 2017, which allows the Transformer model to attend to all relevant elements of
    the input sequences as well as the relative distances between them.

    RelAttention = softmax( mask( QKT + skew(QET) ) / sqrt(d_k) ) V

    Args:
        q: Queries tensor of shape (..., seq_len_q, d_model)
        k: Keys tensor of shape (..., seq_len_k, d_model)
        v: Values tensor of shape (..., seq_len_k, d_model)
        e (optional): Relative Position Embeddings tensor of shape (seq_len_k, d_model)
        mask (optional): mask for input batch with ones indicating the positions to mask

    Returns:
        output attention of shape (..., seq_len_q, d_model)
    """
    QKt = torch.matmul(q, k.transpose(-1, -2))  # (..., seq_len_q, seq_len_k)

    if e is None:
        # assumes q.shape[:-2] == k.shape[:-2]
        Srel = torch.zeros(*q.shape[:-2], q.shape[-2], k.shape[-2], device=q.device)
    else:
        Srel = skew(torch.matmul(q, e.transpose(-1, -2)))  # (..., seq_len_q, seq_len_k)

    # find and scale attention logits
    dk = sqrt(k.shape[-1])
    scaled_attention_logits = (QKt + Srel) / dk  # (..., seq_len_q, seq_len_k)

    # add scaled mask to 0 out positions to mask in softmax
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    # calculate attention by calculating attention weights by softmaxing on the last dimension
    # and then multiplying by v
    return torch.matmul(F.softmax(scaled_attention_logits, dim=-1), v)


class MultiHeadAttention(nn.Module):
    """
    MultiHead Relative Attention Block. Computes attention for input batch along num_heads "heads".
    In the process, attention weights are calculated num_heads times, which allows the network to
    extract information from the input batch through several different representations simultaneously
    """
    def __init__(self, d_model, num_heads, max_rel_dist, bias=True,  batch_first=False, tgt_is_causal=False):
        """
        Args:
            d_model (int): Transformer hidden dimension size
            num_heads (int): number of heads along which to calculate attention
            max_rel_dist (int): maximum relative distance between positions to consider in creating
                                relative position embeddings; set to 0 to compute normal attention
            bias (bool, optional): if set to False, all Linear layers in the MHA block will not learn
                                   an additive bias. Default: True

        """
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.max_rel_dist = max_rel_dist
        self.batch_first = False
       

        if d_model % num_heads != 0:
            raise ValueError("d_model must be divisible into num_heads heads")

        self.depth = self.d_model // self.num_heads

        self.wq = nn.Linear(self.d_model, self.d_model, bias=bias)  # parameter matrix to generate Q from input
        self.wk = nn.Linear(self.d_model, self.d_model, bias=bias)  # parameter matrix to generate K from input
        self.wv = nn.Linear(self.d_model, self.d_model, bias=bias)  # parameter matrix to generate V from input

        self.E = nn.Embedding(self.max_rel_dist, self.d_model)      # relative position embeddings

        self.wo = nn.Linear(self.d_model, self.d_model, bias=True)  # final output layer

    @staticmethod
    def split_heads(x, num_heads, depth=None):
        """
        Helper function to split input x along num_heads heads

        Args:
            x: input tensor to split into heads; shape: (..., L, d_model); d_model = num_heads * depth
            num_heads (int): number of heads along which to calculate attention
            depth (int, optional): desired dimensionality at each head

        Returns:
            input tensor correctly reshaped and transposed to shape (..., num_heads, L, depth)
        """
        # get depth if None
        if depth is None:
            if x.shape[-1] % num_heads != 0:
                raise ValueError("d_model must be divisible into num_heads")
            depth = x.shape[-1] // num_heads

        # reshape and transpose x
        x = x.view(*x.shape[:-1], num_heads, depth)     # (..., L, num_heads, depth)
        return x.transpose(-2, -3)                      # (..., num_heads, L, depth)

    def get_required_embeddings(self, seq_len, max_len=None):
        """
        Helper function to get required non-positive relative position embeddings to calculate attention on
        input of length seq_len. Required relative position embeddings are:
            [last embedding from the right] * max(seq_len - max_len, 0) + Embeddings(max(max_len - seq_len, 0), max_len)

        Requires self.E (nn.Embedding): relative position embeddings ordered from E_{-max_len + 1} to E_0

        Args:
            seq_len (int): length of input sequence
            max_len (int, optional): maximum relative distance considered in relative attention calculation
                                     Default: E.num_embeddings

        Returns:
            required relative position embeddings from E
        """
        if max_len is None:
            max_len = self.E.num_embeddings

        # required relative position embeddings
        E_dev = self.E.weight.device
        first_emb = self.E(torch.arange(0, 1, device=E_dev)).clone()
        return torch.cat(
            [*[first_emb.clone() for _ in range(max(seq_len - max_len, 0))],
             self.E(torch.arange(max(max_len - seq_len, 0), max_len, device=E_dev))],
            dim=0
        )

    def forward(self, q, k, v, mask=None):
        """
        Computes Multi-Head Attention on input tensors Q, K, V

        Args:
            q: Queries tensor of shape (..., seq_len_q, d_model)
            k: Keys tensor of shape (..., seq_len_k, d_model)
            v: Values tensor of shape (..., seq_len_k, d_model)
            mask (optional): mask for input batch with ones indicating positions to mask. Default: None

        Returns:
            multi-head attention of shape (..., seq_len_q, d_model) for input batch
        """
        # get Q, K, V
        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        # get required embeddings from E
        seq_len_k = k.shape[-2]
        e = self.get_required_embeddings(seq_len_k, self.max_rel_dist)  # (seq_len_k, d_model)

        # split into heads
        q = self.split_heads(q, self.num_heads, self.depth)  # (batch_size, h, seq_len_q, depth)
        k = self.split_heads(k, self.num_heads, self.depth)  # (batch_size, h, seq_len_k, depth)
        v = self.split_heads(v, self.num_heads, self.depth)  # (batch_size, h, seq_len_k, depth)
        e = self.split_heads(e, self.num_heads, self.depth)  # (h, seq_len_k, depth)

        # compute MHA
        # attention shape: (batch_size, h, seq_len_q, depth); weights shape: (batch_size, h, seq_len_q, seq_len_k)
        rel_scaled_attention = rel_scaled_dot_prod_attention(q, k, v, e, mask=mask)

        # concatenate heads and pass through final layer
        rel_scaled_attention = rel_scaled_attention.transpose(-2, -3)  # (batch_size, seq_len_q, h, depth)
        sh = rel_scaled_attention.shape
        return self.wo(rel_scaled_attention.reshape(*sh[:-2], self.d_model))  # (batch_size, seq_len_q, d_model)


class PointwiseFFN(nn.Module):
    """
    Fully-connected Feedforward layer that follows the MHA block in each Transformer layer, which is simply a 2 layer
    Dense network with a ReLU in between
    """
    def __init__(self, d_model, d_ff, bias=True):
        """
        Args:
            d_model (int): Transformer hidden dimension size
            d_ff (int): intermediate dimension of FFN blocks
            bias (bool, optional): if set to False, all Linear layers in the FFN block will not learn
                                   an additive bias. Default: True
        """
        super(PointwiseFFN, self).__init__()
        self.d_model = d_model
        self.d_ff = d_ff

        self.main = nn.Sequential(
            nn.Linear(d_model, d_ff, bias=bias),
            nn.ReLU(),
            nn.Linear(d_ff, d_model, bias=bias)
        )

    def forward(self, x):
        return self.main(x)


class DecoderLayer(nn.Module):
    """
    Every TransformerDecoder layer consists of 2 sublayers:
        1. Masked Multi-Head Attention
        2. Pointwise Feedforward Network
    In the original Transformer, each sublayer further employs a residual connection followed by a LayerNorm on the last
    dimension. However, here the LayerNormalization will be placed before the residual connnection, as this Pre-LN
    architecture does not generally require an explicitly designed learning rate schedule.
    """
    def __init__(self, d_model, num_heads, d_ff, max_rel_dist, bias=True, dropout=0.1, layernorm_eps=1e-6):
        """
        Args:
            d_model (int): Transformer hidden dimension size
            num_heads (int): number of heads along which to calculate attention
            d_ff (int): intermediate dimension of FFN blocks
            max_rel_dist (int): maximum relative distance between positions to consider in creating
                                relative position embeddings; set to 0 to compute normal attention
            bias (bool, optional): if set to False, all Linear layers in the Decoder will not learn
                                   an additive bias. Default: True
            dropout (float in [0, 1], optional): dropout rate for training the model
            layernorm_eps (very small positive float, optional): epsilon for LayerNormalization
        """
        super(DecoderLayer, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.max_rel_idst = max_rel_dist
        self.self_attn = MultiHeadAttention(d_model, num_heads, max_rel_dist, bias)
        

        self.mha = MultiHeadAttention(d_model, num_heads, max_rel_dist, bias)
        self.ffn = PointwiseFFN(d_model, d_ff, bias)

        self.layernorm1 = nn.LayerNorm(normalized_shape=d_model, eps=layernorm_eps)
        self.layernorm2 = nn.LayerNorm(normalized_shape=d_model, eps=layernorm_eps)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, tgt, memory=None, tgt_mask=None,
                memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None, tgt_is_causal=False, memory_is_causal = False):
        """
        Forward pass through decoder layer. Designed to be able to use torch's nn.TransformerDecoder as the final model,
        which is why memory and all parameters after tgt_mask are present but are unused.

        Args:
            tgt: input queries tensor from previous layer, named this way to use nn.TransformerDecoder
            tgt_mask (optional, must be explicitly specified as a kwarg): tensor of with 1's indicating positions to
                                                                          mask. Default: None

        Returns:
            output after passing through MHA and FFN blocks, along with intermediate layernorms and residual connections
        """
        # multi-head attention block
        attn_out = self.layernorm1(tgt)
        attn_out = self.mha(attn_out, attn_out, attn_out, mask=tgt_mask)
        attn_out = self.dropout1(attn_out)
        attn_out = tgt + attn_out

        # pointwise ffn block
        ffn_out = self.layernorm2(attn_out)
        ffn_out = self.ffn(ffn_out)
        ffn_out = self.dropout2(ffn_out)
        ffn_out = ffn_out + attn_out

        return ffn_out

In [37]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from math import sqrt

class MusicTransformer(nn.Module):
    """
    Transformer Decoder with Relative Attention. Consists of:
        1. Input Embedding
        2. Absolute Positional Encoding
        3. Stack of N DecoderLayers
        4. Final Linear Layer
    """
    def __init__(self,
                 d_model=hparams["d_model"],
                 num_layers=hparams["num_layers"],
                 num_heads=hparams["num_heads"],
                 d_ff=hparams["d_ff"],
                 max_rel_dist=hparams["max_rel_dist"],
                 max_abs_position=hparams["max_abs_position"],
                 vocab_size=hparams["vocab_size"],
                 bias=hparams["bias"],
                 dropout=hparams["dropout"],
                 layernorm_eps=hparams["layernorm_eps"]):
        """
        Args:
            d_model (int): Transformer hidden dimension size
            num_heads (int): number of heads along which to calculate attention
            d_ff (int): intermediate dimension of FFN blocks
            max_rel_dist (int): maximum relative distance between positions to consider in creating
                                relative position embeddings. Set to 0 to compute normal attention
            max_abs_position (int): maximum absolute position for which to create sinusoidal absolute
                                    positional encodings. Set to 0 to compute pure relative attention
                                    make it greater than the maximum sequence length in the dataset if nonzero
            bias (bool, optional): if set to False, all Linear layers in the MusicTransformer will not learn
                                   an additive bias. Default: True
            dropout (float in [0, 1], optional): dropout rate for training the model. Default: 0.1
            layernorm_eps (very small float, optional): epsilon for LayerNormalization. Default: 1e-6
        """
        super(MusicTransformer, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.max_rel_dist = max_rel_dist,
        self.max_position = max_abs_position
        self.vocab_size = vocab_size

        self.input_embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = abs_positional_encoding(max_abs_position, d_model)
        self.input_dropout = nn.Dropout(dropout)

        self.decoder = nn.TransformerDecoder(
            DecoderLayer(d_model=d_model, num_heads=num_heads, d_ff=d_ff, max_rel_dist=max_rel_dist,
                         bias=bias, dropout=dropout, layernorm_eps=layernorm_eps),
            num_layers=num_layers,
            norm=nn.LayerNorm(normalized_shape=d_model, eps=layernorm_eps)
        )

        self.final = nn.Linear(d_model, vocab_size)

    def forward(self, x, mask=None):
        """
        Forward pass through the Music Transformer. Embeds x according to Vaswani et. al, 2017, adds absolute
        positional encoding if present, performs dropout, passes through the stack of decoder layers, and
        projects into the vocabulary space. DOES NOT SOFTMAX OR SAMPLE OUTPUT; OUTPUTS LOGITS.

        Args:
            x (torch.Tensor): input batch of sequences of shape (batch_size, seq_len)
            mask (optional): mask for input batch indicating positions in x to mask with 1's. Default: None

        Returns:
            input batch after above steps of forward pass through MusicTransformer
        """
        # embed x according to Vaswani et. al, 2017
        x = self.input_embedding(x)
        x *= sqrt(self.d_model)

        # add absolute positional encoding if max_position > 0, and assuming max_position >> seq_len_x
        if self.max_position > 0:
            x += self.positional_encoding[:, :x.shape[-2], :]

        # input dropout
        x = self.input_dropout(x)

        # pass through decoder
        x = self.decoder(x, memory=None, tgt_mask=mask)

        # final projection to vocabulary space
        return self.final(x)

In [38]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from music21 import converter, instrument, note, chord, stream
from pathlib import Path
from keras.utils import to_categorical

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def get_notes(emotion):
    """ Get all the notes and chords from the midi files 
    emotion = "Q1", "Q2", "Q3", 'Q4' """
    
    notes = []

    for file in Path("Dataset/midis_emopia/train").glob(emotion + "*"):
        midi = converter.parse(file)
        print("Parsing %s" % file)
        notes_to_parse = midi.flat.notes

        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder))
    return notes
    
def prepare_sequences(notes, n_vocab):
    """ Prepare the sequences used by the Neural Network """
    sequence_length = 100

    # Get all pitch names
    pitchnames = sorted(set(item for item in notes))

    # Create a dictionary to map pitches to integers
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))

    network_input = []
    network_output = []

    # create input sequences and the corresponding outputs
    for i in range(0, len(notes) - sequence_length - 1, 1):  # Subtract 1 here
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        network_output.append(note_to_int[sequence_out])

    n_patterns = len(network_input)

    # Reshape the input into a format compatible with LSTM layers
    network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))

    # Normalize input between -1 and 1
    network_input = (network_input - float(n_vocab)/2) / (float(n_vocab)/2)

    # to torch tensor
    network_input = torch.from_numpy(network_input).float().to(device)
    network_output = torch.from_numpy(np.array(network_output)).long().to(device)

    return (network_input, network_output)

  
def create_midi(prediction_output, filename):
    """ convert the output from the prediction to notes and create a midi file
        from the notes """
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for item in prediction_output:
        pattern = item[0]
        # pattern is a chord
        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)

        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)
    midi_stream.write('midi', fp='{}.mid'.format(filename))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=50000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].detach()


# Function to train the model
def train_model(model, data, optimizer, criterion, epochs=200):
    losses = []
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        output = model(data)
        # squeeze the data
        output = output.squeeze(0)
        data = data.squeeze(0)

        loss = criterion(output, data)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        if epoch % 10 == 0:
            print("Epoch: {}/{}.............".format(epoch, epochs), end=' ')
            print("Loss: {:.4f}".format(loss.item()))
    return model

def generate(model, input_sequence, seq_len=100):
    model.eval()
    generated = []
    input_sequence = torch.from_numpy(input_sequence).float().to(device)
    for i in range(seq_len):
        output = model(input_sequence)
        output = output.cpu().detach().numpy()
        generated.append(output)
        input_sequence = np.concatenate((input_sequence[0][1:], output[-1]))
        input_sequence = np.reshape(input_sequence, (1, 100, 1))
        input_sequence = torch.from_numpy(input_sequence).float().to(device)
    return generated



def main():
    # Get all the notes and chords from the midi files
    notes = get_notes("Q1")
    n_vocab = len(set(notes))
    print(n_vocab)
    network_input, network_output = prepare_sequences(notes, n_vocab)
    print(network_input.shape)
    print(network_output.shape)
    # Define the model
    model = MusicTransformer(
        d_model=128,
        num_layers=3,
        num_heads=8,
        d_ff=512,
        max_rel_dist=1024,
        max_abs_position=0,
        vocab_size=n_vocab,
        bias=True,
        dropout=0.1,
        layernorm_eps=1e-6
    ).to(device)

    # Define the optimizer and the loss function
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    # get Long
    network_output = network_output.long()
    network_input = network_input.long()
    network_output = network_output.squeeze(1)
    # Train the model
    model = train_model(model, network_input[0], optimizer, criterion, epochs=100)

    # Generate a new midi file
    prediction_output = generate(model, network_input[0])
    create_midi(prediction_output, "test_output")

if __name__ == '__main__':
    main()


Parsing Dataset/midis_emopia/train/Q1_1Qc15G0ZHIg_3.mid
Parsing Dataset/midis_emopia/train/Q1_Jn9r0avp0fY_3.mid
Parsing Dataset/midis_emopia/train/Q1_POaIGvLsp5M_1.mid
Parsing Dataset/midis_emopia/train/Q1_eVMSeElk81Q_2.mid
Parsing Dataset/midis_emopia/train/Q1_1vjy9oMFa8c_2.mid
Parsing Dataset/midis_emopia/train/Q1_HY9vPoHbgaI_1.mid
Parsing Dataset/midis_emopia/train/Q1_ANZf1QXsNrY_3.mid
Parsing Dataset/midis_emopia/train/Q1_fey-8bOR95E_0.mid
Parsing Dataset/midis_emopia/train/Q1_vv6nrZ2myXw_4.mid
Parsing Dataset/midis_emopia/train/Q1_6Uf9XBUD3wE_0.mid
Parsing Dataset/midis_emopia/train/Q1_9v2WSpn4FCw_2.mid
Parsing Dataset/midis_emopia/train/Q1_SQDuF0qxGQw_1.mid
Parsing Dataset/midis_emopia/train/Q1_Cg2u_Ldjv8g_0.mid
Parsing Dataset/midis_emopia/train/Q1_im4Qxn3GQvo_1.mid
Parsing Dataset/midis_emopia/train/Q1_9Yb9OEVwups_0.mid
Parsing Dataset/midis_emopia/train/Q1_vv6nrZ2myXw_3.mid
Parsing Dataset/midis_emopia/train/Q1_im4Qxn3GQvo_0.mid
Parsing Dataset/midis_emopia/train/Q1_aYe-2Glruu

IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)