# Emotion Conditioned Music Generation
This notebook provides the code for implementing a Transformer-GAN for the dissertation. The objective of the model is to produce sentimental music given an input emotion

## Importing libraries

In [36]:
# !pip install music21 miditoolkit miditok

In [37]:
# %pip install --user torch==1.7.0 torchvision==0.8.1 -f https://download.pytorch.org/whl/cu102/torch_stable.html

In [38]:
import numpy as np 
import pandas as pd 
from io import open
import tensorflow as tf
import glob
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import time
from miditok import get_midi_programs, REMI
from miditoolkit import MidiFile
from torch.nn.utils.rnn import pad_sequence
import random

In [39]:
torch.__version__

'1.7.0'

In [40]:
device = 'cuda'

In [41]:
torch.cuda.empty_cache()

In [42]:
torch.cuda.is_available()

True

## Loading the Dataset

In [43]:
# how a midi file looks like
midi = MidiFile('archive/EMOPIA_1.0 (1)/EMOPIA_1.0/midis/Q1__8v0MFBZoco_0.mid')
midi

ticks per beat: 384
max tick: 46051
tempo changes: 1
time sig: 1
key sig: 0
markers: 0
lyrics: False
instruments: 1

In [44]:
# for now, we will only be using for piano right since it determines the melody
midi.instruments

[Instrument(program=0, is_drum=False, name="")]

In [45]:
# file path to the MIDI files
files_paths = list(glob.glob('archive/EMOPIA_1.0 (1)/EMOPIA_1.0/midis/*.mid'))
# reading labels
labels_df = pd.read_csv('archive/EMOPIA_1.0 (1)/EMOPIA_1.0/label.csv')
labels_df = list(labels_df['4Q'])

In [46]:
import muspy

In [47]:
def return_range(music):
    highest = 0
    lowest = 127
    for track in music.tracks:
        for note in track.notes:
            if note.pitch > highest:
                highest = note.pitch
            if note.pitch < lowest:
                lowest = note.pitch
    return [highest, lowest]

In [48]:
tempos = []
pitches = []

for file in files_paths:
    music = muspy.read_midi(file)
    tempos.append(music.tempos[0].qpm)
    pitches.extend(return_range(music))

In [49]:
print('minimum pitch found', min(pitches))
print('maximum pitch found', max(pitches))

minimum pitch found 22
maximum pitch found 105


In [50]:
pitch_range = range(22, 105)
additional_tokens = {'Chord': True, 'Rest': True, 'Tempo': True, 'Program': False,
                     'rest_range': (2, 4),  # (half, 8 beats)
                     'nb_tempos': 10,  # nb of tempo bins
                     'tempo_range': (100, 140),
                     'TimeSignature':None}  # (min, max)

In [51]:
# create a list of notes
# this stores the REMI encoded tokens of the midi files

def load_files(files_paths, encoder = REMI()):
    assert len(files_paths) > 0
    notes = []

    for file in files_paths:
        # file_name = os.path.basename(file)

        # read the MIDI file
        midi = MidiFile(file)

        # Converts MIDI to tokens
        tokens = encoder.midi_to_tokens(midi)
        
        # The EMOPIA dataset has midi files with only one instrument, i.e. the piano 
        # hence we just add those tokens
        notes.append(tokens[0])

    return notes, encoder

In [52]:
notes, remi_enc = load_files(files_paths, REMI(pitch_range, additional_tokens= additional_tokens))

In [53]:
print("There are",len(remi_enc.vocab),"unique tokens in the files")

There are 245 unique tokens in the files


In [54]:
# Create a dataset corpus from the notes and labels
from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler
class REMICorpus(Dataset):
    def __init__(self, notes, labels, encoder, seq_length, split_size = 0.2):
        self.encoder = encoder
        self.seq_len = seq_length

        # ntrain, ntest, ltrain, ltest = train_test_split(notes, labels, test_size=split_size, random_state=42, shuffle=True, stratify=labels)
 
        self.xtrain, self.ytrain = self.tokenize(notes, labels)
        # self.xvalid = self.tokenize(ntest, ltest)
    
    def __len__(self):
        return len(self.encoder.vocab)

    def __getitem__(self, index):
        return self.xtrain[index], self.ytrain[index]
    
    
    def tokenize(self, notes, labels):
        assert len(notes) > 0
        assert len(labels) > 0

        # create a set of notes
        # they should all be padded to have sequence of len seq_len
        songss = []
        labelss = []
        for song, label in zip(notes, labels):
            song = torch.tensor(song).type(torch.int64)
            songs = list(song.split(self.seq_len))

            for i in range(len(songs)):
                # removing sequences that have < seq len/4 tokens
                if len(songs[i]) < self.seq_len/4:
                    del songs[i]
                    continue
                labelss.append(label-1)
                
                
            songss.extend(songs)
        
        # padding songs to be of same length
        songs = pad_sequence(songss)

        corpus = []
        print(songs.shape)
        print(len(labelss))
        # adding emotion values to the sequences
        for song, label in zip(songs.T, labelss):
            l = torch.full((self.seq_len,), label)
            inp = torch.stack([song, l], dim=-1)
            corpus.append(inp)

    
        corpus = torch.stack(corpus)

        data = corpus[:,:self.seq_len - 1, :]
        target = corpus[:,1:self.seq_len, :]
            

        # converting all the tokens in each type to new values:
        return data, target
        

In [55]:
corpus = REMICorpus(notes, labels_df, remi_enc, 21, split_size=0.01)

torch.Size([21, 57245])
57245


In [56]:
train_target = corpus.ytrain
train_data = corpus.xtrain


print("train data shape:", train_data.shape)
print("train target shape:", train_target.shape)

train data shape: torch.Size([57245, 20, 2])
train target shape: torch.Size([57245, 20, 2])


In [57]:
batch_size = 32
# creating a dataloader
train_dataloader = DataLoader(
    corpus,
    sampler=SequentialSampler(train_data),
    batch_size=batch_size,
)

In [58]:
print("There are total",len(notes), "songs and a total of", train_data.shape[0], "sequences extracted")

There are total 1078 songs and a total of 57245 sequences extracted


## Model Building

### Constants

In [59]:
# size of the model
emsize = 256

# parameters for the transformers
ntokens = [len(corpus), 4]
nhead = 4
nhid = 512
nlayer = 7

# dropout
dropout = 0.4

# learning rates for each
lr_g = 0.0001
lr_d = 0.0001


In [60]:
len(train_dataloader)

1789

### Position Encoding

In [61]:
# adapted from the pytorch positional encoding class
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # PE is the Positional Encoding matrix 
        # THIS STORES THE POSITIONS OF THE SEQUENCE
        pe = torch.zeros(max_len, d_model)

        # Arange - RETURNS A RANGE BETWEEN VALUES, HERE IT IS 0 - max_len
        # unsqueeze - adds a dimension, 1 means that each element in the first list is now in a list
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # division term, here it is (10000 ** ((2 * i)/d_model))
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # calculating the position encoding for the even and odd terms        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # Unsqueeze 0 will put PE in one list
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # make embeddings relatively larger
        # This is so we do not lose the importance of the embedding
        # we add the embedding to the PE 
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [62]:
class Generator(nn.Module):
    """Container module with an encoder, a recurrent or transformer module, and a decoder."""

    def __init__(self, ntoken, d_model, nhead, nlayers, dropout=0.5, max_length = 2048, device = device):
        super(Generator, self).__init__()
        try:
            from torch.nn import TransformerEncoder, TransformerEncoderLayer
        except:
            raise ImportError('TransformerEncoder module does not exist in PyTorch 1.1 or lower.')

        # original mask
        self.src_mask = None
        self.max_length = max_length
        self.d_model = d_model
        self.nlayers = nlayers
        self.ntokens = ntoken

        self.device = device

        # NEW criterion and embedding size
        self.criterion = nn.CrossEntropyLoss(reduction='none')
        # CHANGED: using embedding size and reshaping vector
        self.embed_siz = [128, 128]

        # embedding encoding
        self.embedding_notes  = nn.Embedding(self.ntokens[0], self.embed_siz[0])
        self.embedding_emotion   = nn.Embedding(self.ntokens[1], self.embed_siz[1])
        
        # positional encoding
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        # in linear layer
        # CHANGED: using this to convert one hot encoding of emotions batch * 5 -> linear transformation of emotions batch * 
        # TODO
        self.linear = nn.Linear(np.sum(self.embed_siz), self.d_model)
        
        # encoder
        encoder_layer = TransformerEncoderLayer(d_model = d_model, nhead = nhead, dropout = dropout)
        self.encoder = TransformerEncoder(encoder_layer, nlayers)

        # output layers
        self.project_notes = nn.Linear(d_model, ntoken[0])
        self.project_emo = nn.Linear(d_model, ntoken[1])
        
        
        self.init_weights()
    
    def compute_loss(self, predict, target):
        loss = self.criterion(predict, target)
        return torch.sum(loss)
            

    def _generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
        return mask

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.embedding_notes.weight, -initrange, initrange)
        nn.init.uniform_(self.embedding_emotion.weight, -initrange, initrange)
     
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)
        self.project_notes.bias.data.zero_()
        self.project_notes.weight.data.uniform_(-initrange, initrange)
        self.project_emo.bias.data.zero_()
        self.project_emo.weight.data.uniform_(-initrange, initrange)

    def forward(self, x_note, x_emo, src_mask):

        # if x_emo.dtype != torch.LongTensor:
        #     x_emo.type(torch.LongTensor)
        # if x_note.dtype != torch.LongTensor:
        #     x_note.type(torch.LongTensor)
        # creating embedding for the notes and emotions
        x_note = self.embedding_notes(x_note.long().to(device))
        x_emo = self.embedding_emotion(x_emo.long().to(device))

        # normalising the input for the position encoding
        x_note = x_note * math.sqrt(self.d_model)
        x_emo = x_emo * math.sqrt(self.d_model)

        # concatenating as one input
        x = torch.cat([x_note, x_emo], dim=-1)

        # sending through linear layer
        # x = self.linear(x)

        x = self.pos_encoder(x)

        # print(x.shape)
        # print(x.view(x.size(1), x.size(0), x.size(2)).shape)
        # print(x)
        # print()

        if src_mask == None:
            src_mask = self._generate_square_subsequent_mask(x.size(1)).to(self.device)
            
        self.src_mask = src_mask

        output = self.encoder(x.view(x.size(1), x.size(0), x.size(2)), self.src_mask)

        y_notes = self.project_notes(output)
        y_emo = self.project_emo(output)

        # also return loss of y notes

        # y_notes = F.log_softmax(y_notes, dim=-1)
        # y_emo = F.log_softmax(y_emo, dim=-1)

        return F.log_softmax(y_notes, dim=-1), F.log_softmax(y_emo, dim=-1)

In [63]:
class Discriminator(nn.Module):
    """
    Discriminator based on a pytorch TransformerEncoder.
    """
    
    def __init__(self, ntoken, d_model, nhead, nhid, nlayers, dropout=0.5, max_length = 2048):
        super(Discriminator, self).__init__()
        
        try:
            from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoderLayer, TransformerDecoder
        except:
            raise ImportError('TransformerEncoder module does not exist in PyTorch 1.1 or lower.')

        self.d_model = d_model

        # default embedding sizes:
        self.embedding_size = [128, 128]
        
        # seperate embedding layers for notes and emotions
        self.embedding_notes = nn.Embedding(ntoken[0], self.embedding_size[0])
        self.embedding_emotion = nn.Embedding(ntoken[1], self.embedding_size[1])
        
        # linear layer for converting the extra dimension to a linear vector
        self.linear = nn.Linear(np.sum(self.embedding_size), self.d_model)
        
        # encoding positional information using position encoder
        # with default drop out of 0.2
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        
        # encoding layers
        encoder_layers = TransformerEncoderLayer(d_model, nhead, nhid, dropout)
        self.encoder = TransformerEncoder(encoder_layers, nlayers)
        
        # final classification layer
        self.classifier = nn.Linear(d_model, 2)

        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.embedding_notes.weight, -initrange, initrange)
        nn.init.uniform_(self.embedding_emotion.weight, -initrange, initrange)
     
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)
        self.classifier.bias.data.zero_()
        self.classifier.weight.data.uniform_(-initrange, initrange)
        # self.project_emo.bias.data.zero_()
        # self.project_emo.weight.data.uniform_(-initrange, initrange)
    def forward(self, x_note = None, x_emo = None, emb_note = None, emb_emo = None):

        if emb_emo is not None:
            x_emo = emb_emo * math.sqrt(self.d_model)
        else:
            x_emo = self.embedding_emotion(x_emo.long().to(device))
            # print(x_emo.shape)
            # normalising the input for the position encoding
            x_emo = x_emo * math.sqrt(self.d_model)
            
            
        if emb_note is not None:
            x_note = emb_note * math.sqrt(self.d_model)
        else:
            # creating embedding for the notes and emotions
            x_note = self.embedding_notes(x_note.long().to(device))
            # normalising the input for the position encoding
            x_note = x_note * math.sqrt(self.d_model)

        # concatenating as one input
        x = torch.cat([x_note, x_emo], dim=-1)
        # print(x.shape, "disc shape for linear")

        # sending through linear layer
        # x = self.linear(x)

        # encoding positions
        x = self.pos_encoder(x)

        # sending through transformer encoder
        x = self.encoder(x)
        
        # classification
        x = x.mean(dim=1)
        x = self.classifier(x)
        # print(x.shape)
        return x

In [73]:
class MidiTransGAN(nn.Module):
    def __init__(self, generator, discriminator, noise_fn,
                 batch_size=32, device='cuda', lr_d=0.004, lr_g=0.004):
        """A GAN class for holding and training a generator and discriminator
        Args:
            generator: a Ganerator network
            discriminator: A Discriminator network
            noise_fn: function f(num: int) -> pytorch tensor, (latent vectors)
            data_fn: function f(num: int) -> pytorch tensor, (real samples)
            batch_size: training batch size
            device: cpu or CUDA
            lr_d: learning rate for the discriminator
            lr_g: learning rate for the generator
        """
        super(MidiTransGAN, self).__init__()
        self.generator = generator.to(device)
        # self.generator = self.generator.to(device)
        self.discriminator = discriminator.to(device)
        # self.discriminator = self.discriminator.to(device)
        self.noise_fn = noise_fn
        self.batch_size = batch_size
        self.device = device
        self.criterion = nn.BCEWithLogitsLoss()
        self.optim_d = torch.optim.Adam(discriminator.parameters(),
                                  lr=lr_d,)
        self.optim_g = torch.optim.Adam(generator.parameters(),
                                  lr=lr_g)

        self.seq_len = 100
        self.add_noise = 16
        # self.src_mask = self.src_mask = torch.triu(torch.ones(511, 511) * float('-inf'), diagonal=1).to(device)

    def compute_accuracy(self, predicted_weights, target):
        predicted = predicted_weights.argmax(dim=1)
        return torch.sum(predicted == target) / len(target)

    def calc_gradient_penalty(self, real, fake, LAMBDA=0.02):
        temp_notes = torch.rand([real.shape[0], 1]).to(device)
        # expand into the shape
        temp_notes = temp_notes.expand(real[:,:,0].size())

        # interpolation
        mid = temp_notes * real[:,:,0] + ((1 - temp_notes) * fake[:,:,0])

        mid = mid.type(torch.LongTensor)
        mid = mid.type(torch.FloatTensor).to(device)

        mid = torch.autograd.Variable(mid, requires_grad=True)
        # print(mid.shape)
    
        mid = torch.einsum(
            "ve,bn -> bne",
            self.discriminator.embedding_notes.weight,
            mid,
        )

        # print(mid.type(torch.LongTensor))
        classification = self.discriminator(emb_note = mid, x_emo = real[:,:,1].to(device))
        

        gradients = torch.autograd.grad(outputs=classification, inputs=mid,
                                        grad_outputs=torch.ones(classification.size(), device=device),
                                        create_graph=True, retain_graph=True, allow_unused = True)[0]
        # print(gradients)
        gradients = gradients.view(real.shape[0], -1)

        # https://github.com/igul222/improved_wgan_training/blob/master/gan_language.py
        slopes = torch.sqrt(torch.sum(gradients ** 2, dim=1) + 1e-12)
        gradient_penalty = ((slopes - 1.) ** 2).mean() * LAMBDA

        return gradient_penalty

    # Cross Entropy loss with label smoothing
    # https://arxiv.org/pdf/1606.03498.pdf
    # https://github.com/NVIDIA/DeepLearningExamples
    def label_smoothing_loss(self, x, is_real, smoothing = 0.5):

        if is_real:
            # real labels are smoothened from 1 to a range between (0.8, 1.2)
            # One Sided Label Smoothing (Real Label [0.8,1.2]
            target = torch.tensor(random.randrange(8, 12) / 10)
        else:
            target = torch.tensor(0.0)
        
        target =  target.expand_as(x).to(device)

        return self.criterion(x, target)
        
    def generate_samples(self, latent_vec=None, emotion=None, num=None, src_mask = None, temperature = 1):
        """Sample from the generator.
        Args:
            latent_vec: A pytorch latent vector or None
            num: The number of samples to generate if latent_vec is None
        If latent_vec and num are None then use self.batch_size random latent
        vectors.
        """
        num = self.batch_size if num is None else num
        latent_vec = self.noise_fn(self.seq_len,1, emotion) if latent_vec is None else latent_vec

        if emotion == None:
            emotion = latent_vec[:,:,1][0][0]
        
        if src_mask == None:
            src_mask = generate_square_subsequent_mask((latent_vec.size(0))).to(device)
        # print(src_mask.shape)

        # since we are not training
        # we fix no gradients
        with torch.no_grad():
            # learning for 3 sequences at a time
            # this is purely due to resource constraints
            for i in range(num):

                # generating fake samples
                fake_samples, _ = generator(latent_vec[:,:,0], latent_vec[:,:,1], src_mask = None)
                
                # For Notes:
                # getting the weights and converting them to notes
                word_weights = fake_samples[-1].squeeze().exp().div(temperature).cpu()
                # getting the values from the distribution from 218 (num of possible notes)
                word = torch.multinomial(word_weights, 1)
                # batch size * 1 -> 1 * batch_size
                # word_notes = word.view(1, word.size(0))
                # print(word_notes.shape)
                word_notes = word.view(word.size(0), 1)

                emotions = torch.full((word_notes.size(0),1), emotion)

                # stack the emotions to the final shape: seq_len * batch_size * 2 (1 for emotion and 1 for notes)
                # here seq_len = 1
                word_tensor = torch.stack([word_notes, emotions], dim=-1)
                
                # concatenate vector to a fix length of seq len (here it is set as 4)
                # shape -> seq_len * batch_size * 2
                latent_vec = torch.cat([latent_vec, word_tensor.to(device)], dim=1)
            
        # with torch.no_grad():
        #     samples = self.generator(latent_vec, emotion, src_mask = None)
        return latent_vec

    def train_step_generator(self, real_samples, real_target, i):
        """Train the generator one step and return the loss."""
        self.generator.zero_grad()

        # latent_vec = self.noise_fn(10,self.batch_size)
        # latent_vec = latent_vec.to(device)

        # emotion = self.emotions[:,:self.batch_size].to(device)
        
        # real_samples, real_target = self.data_fn(train_data, self.batch_size)
        emotions = real_samples[:,:,1].T[0]

        # generated samples
        # starting with a sequence of length 4
        latent_vec = self.noise_fn(real_samples.size(1),real_samples.size(0), emotions)
        target = latent_vec[:,:,1].T[0]
        emotion = latent_vec[:,:,1][0][0]
        loss_emotions = 0
        acc_emotions = 0

        # since we are not traning generator
        # we fix no gradients

        # learning for 10 length sequences at a time
        # this is purely due to resource constraints
        for i in range(20):

            # generating fake samples
            fake_samples, out_emo = generator(latent_vec[:,:,0], latent_vec[:,:,1], src_mask = None)
            
            # For Notes:
            # getting the weights and converting them to notes
            word_weights = fake_samples[-1].squeeze().exp().cpu()
            # getting the values from the distribution from 218 (num of possible notes)
            word = torch.multinomial(word_weights, 1)
            # batch size * 1 -> 1 * batch_size
            word_notes = word

            if i == 0:
                nll_loss = nn.CrossEntropyLoss()(fake_samples.view(fake_samples.size(1), fake_samples.size(2), fake_samples.size(0)).cpu(), real_target[:,:,0].cpu())
                nll_loss_emotion =  nn.CrossEntropyLoss()(out_emo.view(out_emo.size(1), out_emo.size(2), out_emo.size(0)).cpu(), real_target[:,:,1].cpu())
                nll_loss += nll_loss_emotion
            # for Emotions:
            # getting the weights and converting them to notes
            emo_weights = out_emo.mean(dim=0)
            loss_emotion = nn.CrossEntropyLoss()(emo_weights, target)
            acc_emotion = self.compute_accuracy(emo_weights, target)
            loss_emotions += loss_emotion
            acc_emotions += acc_emotion
            # # getting the values from the distribution from 218 (num of possible notes)
            # word = torch.multinomial(word_weights, 1)
            # # batch size * 1 -> 1 * batch_size
            # word_emotions = word.T
            emotions = torch.full((word_notes.size(0), word_notes.size(1)), emotion.item())

            # stack the emotions to the final shape: seq_len * batch_size * 2 (1 for emotion and 1 for notes)
            # here seq_len = 1
            word_tensor = torch.stack([word_notes, emotions], dim=-1)
            
            # concatenate vector to a fix length of seq len (here it is set as 4)
            # shape -> seq_len * batch_size * 2
            latent_vec = torch.cat([latent_vec[:,1:,:], word_tensor.to(device)], dim=1)
                
        
        classifications = self.discriminator(latent_vec[:,:,0].to(device), latent_vec[:,:,1].to(device))
 
        # loss for generator
        loss_gen = self.criterion(classifications, torch.zeros_like(classifications).to(device))
        # print(out_emo)
        # loss for emotions
        loss_emotions = loss_emotions / 20
        acc_emotions = acc_emotions / 20
        # loss_emo = self.criterion(out_emo, torch.full((out_emo.size(0), out_emo.size(1)), emotion.item()))
        # loss = (loss_gen + loss_emotions) / 2
        loss = loss_gen
        nll_loss = nll_loss / 2
        # print(loss_gen)
        # loss_gen.retain_grad()
        loss.backward()
        nll_loss.backward()
        nn.utils.clip_grad_norm_(generator.parameters(), 3)
        self.optim_g.step()
        return loss.item(), acc_emotions, nll_loss.item(), nll_loss_emotion.item()

    def train_step_discriminator(self, real_samples, real_target, i):
        """Train the discriminator one step and return the losses."""
        self.discriminator.zero_grad()

        # getting real samples
        # this is using the data_fn or the get batch function
        # here, the data is the sequence with shape batch_size * seq_len * num of tokens
        # in general that is 32 * 100 * 2
        # this batch is randomly sampled from the corpus
        # the target sequence is the same shape, and is the next step in the sequence
        # real_samples, real_target = self.data_fn(train_data, self.batch_size)
        emotions = real_samples[:,:,1].T[0]
        real_samples = real_samples.to(device)
        # real_target = real_target.to(device)


        # the discrimiator
        # [:,:,0] -> notes
        # [:,:,1] -> emotion
        pred_real = self.discriminator(real_samples[:,:,0], real_samples[:,:,1])
        
        # Adding Noise to fake labels every few iterations
        if i % self.add_noise == 0:
            loss_real = self.label_smoothing_loss(pred_real, is_real = False)
        else:
            loss_real = self.label_smoothing_loss(pred_real, is_real = True)
        
        # loss_real.backward()
        # loss_real = self.criterion(pred_real, torch.ones(pred_real.size(0), dtype=torch.int64).to(device))

        # generated samples
        # starting with a sequence of length 4
        latent_vec = self.noise_fn(real_samples.size(1),real_samples.size(0), emotions)
        target = latent_vec[:,:,1].T[0]
        emotion = latent_vec[:,:,1][0][0]
        loss_emotions = 0
        acc_emotions = 0
        nll_loss = 0

        # since we are not traning generator
        # we fix no gradients
        with torch.no_grad():
            # learning for 3 sequences at a time
            # this is purely due to resource constraints
            for i in range(20):

                # generating fake samples
                fake_samples, out_emo = generator(latent_vec[:,:,0], latent_vec[:,:,1], src_mask = None)
                
                
                # For Notes:
                # getting the weights and converting them to notes
                word_weights = fake_samples[-1].squeeze().exp().cpu()
                # getting the values from the distribution from 218 (num of possible notes)
                word = torch.multinomial(word_weights, 1)
                # batch size * 1 -> 1 * batch_size
                word_notes = word

                # # for Emotions:
                # # getting the weights and converting them to notes
                emo_weights = out_emo.mean(dim=0)
                loss_emotion = nn.CrossEntropyLoss()(emo_weights, target)
                acc_emotion = self.compute_accuracy(emo_weights, target)
                loss_emotions += loss_emotion
                acc_emotions += acc_emotion
                # word_weights = out_emo[-1].squeeze().exp().cpu()
                # # getting the values from the distribution from 218 (num of possible notes)
                # word = torch.multinomial(word_weights, 1)
                # # batch size * 1 -> 1 * batch_size
                # word_emotions = word.T
                emotions = torch.full((word_notes.size(0), word_notes.size(1)), emotion.item())

                # stack the emotions to the final shape: seq_len * batch_size * 2 (1 for emotion and 1 for notes)
                # here seq_len = 1
                word_tensor = torch.stack([word_notes, emotions], dim=-1)
                
                # concatenate vector to a fix length of seq len (here it is set as 4)
                # shape -> seq_len * batch_size * 2
                latent_vec = torch.cat([latent_vec[:,1:,:], word_tensor.to(device)], dim=1)

        # predict on the fake samples
        pred_fake = self.discriminator(latent_vec[:,:,0].to(device), latent_vec[:,:,1].to(device))

        if i % self.add_noise == 0:
            loss_fake = self.label_smoothing_loss(pred_fake, is_real = True)
        else:
            loss_fake = self.label_smoothing_loss(pred_fake, is_real = False)

        # loss_fake.backward()
        # loss on fake
        # loss_fake = self.criterion(pred_fake, torch.zeros((pred_fake.size(0)), dtype=torch.int64).to(device))
        # loss on emotions
        loss_emotions = loss_emotions / 20
        acc_emotions = acc_emotions / 20
        # loss_emo = criterion(out_emo.cpu(), emotion.T[:,:5].cpu())

        # gp = self.calc_gradient_penalty(real_samples.to(device), latent_vec.detach().to(device))
        # loss_fake = torch.mean(pred_fake)
        # loss_real = -torch.mean(pred_real)
        # combine
        loss = 0.5 * (loss_fake + loss_real)
        loss.backward()
        # print(out_emo)
        # print(emotion[:out_emo.size(0)])
        # loss_real.backward()
        # loss_fake.backward()
        # loss_emo.backward()
        self.optim_d.step()
        return loss_real.item(), loss_fake.item(), acc_emotions, loss.item()

    def train_step(self, real_samples, real_target, i):
        """Train both networks and return the losses."""
        loss_d = self.train_step_discriminator(real_samples, real_target, i)
        loss_g = self.train_step_generator(real_samples, real_target, i)
        return loss_g, loss_d

In [74]:
# get_batch subdivides the source data into chunks of length args.bptt.
# If source is equal to the example output of the batchify function, with
# a bptt-limit of 2, we'd get the following two Variables for i = 0:
# ┌ a g m s ┐ ┌ b h n t ┐
# └ b h n t ┘ └ c i o u ┘
# Note that despite the name of the function, the subdivison of data is not
# done along the batch dimension (i.e. dimension 1), since that was handled
# by the batchify function. The chunks are along dimension 0, corresponding
# to the seq_len dimension in the LSTM.
def get_batch(source, batch_size):
    rand_columns = torch.randperm(source.size(0))[:batch_size]
    # batch_size = min(batch_size, len(source) - 1 - i)
    data = source[rand_columns,:source.size(1)-1, :]
    target = source[rand_columns,1:source.size(1), :]
    return data, target

In [75]:
def generate_square_subsequent_mask(sz):
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [76]:
def noise_fn(seq_len, batch_size, emotions=None):
    notes = torch.randint(len(corpus), (batch_size, seq_len), dtype=torch.long).to(device)
    if emotions != None:
        emotions = emotions.repeat(seq_len, 1).T.to(device)
    else:
        emotion = torch.randint(0,4, (1,), dtype=torch.long)
        emotions = torch.full((batch_size, seq_len), emotion.item()).to(device)
    return torch.stack([notes, emotions], dim=-1)
    

In [77]:
generator = Generator(ntokens, emsize, nhead, nlayer, dropout)
discriminator = Discriminator(ntokens, emsize, nhead, nhid, nlayer, dropout)

gan = MidiTransGAN(generator, discriminator, noise_fn, batch_size=batch_size, device=device)

In [78]:
def network_paras(model):
    # compute only trainable params
    param = filter(lambda p: p.requires_grad, model.parameters())
    params = sum([np.prod(p.size()) for p in param])
    return params
print("There are",network_paras(generator),"parameters in generator")
print("There are",network_paras(discriminator),"parameters in discriminator")

There are 9367161 parameters in generator
There are 3787906 parameters in discriminator


## Training

In [79]:
import torch
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

In [80]:
len(train_dataloader)

1789

In [None]:
from time import time

gan.train()
# best_nll = 100000
# best_dict = None
def train():
    epochs = 200
    batches = len(train_dataloader)
    
    loss_gs, acc_gs, loss_d_reals, loss_d_fakes, acc_ds, nll_losses, emo_losses, loss_ds = [], [], [], [], [], [], [], []
    start = time()
    
    for epoch in range(epochs):
        total_loss_g, total_acc_g, total_loss_d_real, total_loss_d_fake, total_acc_d, total_nll, total_nll_emo, total_loss_d = 0, 0, 0, 0, 0, 0, 0, 0

        for bidx, (xtrain, ytrain)  in enumerate(train_dataloader): 
            xtrain.to(device)
            ytrain.to(device)

            # print(xtrain.shape)

            (loss_g, accuracy_g, nll_loss, nll_loss_emo), (loss_d_real, loss_d_fake, accuracy_d, loss_d) = gan.train_step(xtrain, ytrain, bidx)
            
            total_loss_g += loss_g
            total_loss_d_real += loss_d_real
            total_loss_d_fake += loss_d_fake
            total_acc_g += accuracy_g
            total_acc_d += accuracy_d
            total_nll += nll_loss
            total_nll_emo += nll_loss_emo
            total_loss_d += loss_d

        loss_gs.append(total_loss_g / batches)
        loss_d_reals.append(total_loss_d_real / batches)
        loss_d_fakes.append(total_loss_d_fake / batches)
        acc_gs.append(total_acc_g / batches)
        acc_ds.append(total_acc_d / batches)
        nll_losses.append(total_nll / batches)
        emo_losses.append(total_nll_emo / batches)
        loss_ds.append(total_loss_d / batches)

        writer.add_scalar("Generator Loss", loss_gs[-1], epoch)
        writer.add_scalar("Discriminator Loss (Real)", loss_d_reals[-1], epoch)
        writer.add_scalar("Discriminator Loss (Fake)", loss_d_fakes[-1], epoch)
        writer.add_scalar("Generator Accuracy", acc_gs[-1], epoch)
        writer.add_scalar("Discriminator Accuracy", acc_ds[-1], epoch)
        writer.add_scalar("NLL", nll_losses[-1], epoch)
        writer.add_scalar("NLL (Emo)", emo_losses[-1], epoch)
        writer.add_scalar("Discriminator Loss", loss_ds[-1], epoch)
        
        print(f"Epoch {epoch+1}/{epochs} ({int(time() - start)}s):"
              f" Gen Loss: {loss_gs[-1]:.3f},"
              f" Dis Loss (Real): {loss_d_reals[-1]:.3f},"
              f" Dis Loss (Fake): {loss_d_fakes[-1]:.3f}",
              f" Gen Accuracy: {acc_gs[-1]:.3f}",
              f" Dis Accuracy: {acc_ds[-1]:.3f}",
              f" NLL: {nll_losses[-1]:.3f}",
              f" NLL (Emo): {emo_losses[-1]:.3f}",
              f" Dis Loss: {loss_ds[-1]:.3f}")
        
train()

Epoch 1/200 (835s): Gen Loss: 0.592, Dis Loss (Real): 0.798, Dis Loss (Fake): 0.592  Gen Accuracy: 0.219  Dis Accuracy: 0.219  NLL: 3.415  NLL (Emo): 1.387  Dis Loss: 0.695

Epoch 2/200 (1670s): Gen Loss: 0.592, Dis Loss (Real): 0.784, Dis Loss (Fake): 0.592  Gen Accuracy: 0.214  Dis Accuracy: 0.214  NLL: 3.407  NLL (Emo): 1.386  Dis Loss: 0.688

Epoch 3/200 (2509s): Gen Loss: 0.595, Dis Loss (Real): 0.781, Dis Loss (Fake): 0.595  Gen Accuracy: 0.214  Dis Accuracy: 0.214  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.688

Epoch 4/200 (3348s): Gen Loss: 0.588, Dis Loss (Real): 0.787, Dis Loss (Fake): 0.588  Gen Accuracy: 0.023  Dis Accuracy: 0.023  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.688

Epoch 5/200 (4186s): Gen Loss: 0.590, Dis Loss (Real): 0.785, Dis Loss (Fake): 0.590  Gen Accuracy: 0.274  Dis Accuracy: 0.274  NLL: 3.407  NLL (Emo): 1.386  Dis Loss: 0.688

Epoch 6/200 (5017s): Gen Loss: 0.592, Dis Loss (Real): 0.784, Dis Loss (Fake): 0.592  Gen Accuracy: 0.214  Dis Accuracy: 0.214  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.688

Epoch 7/200 (5855s): Gen Loss: 0.588, Dis Loss (Real): 0.787, Dis Loss (Fake): 0.588  Gen Accuracy: 0.214  Dis Accuracy: 0.214  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.687

Epoch 8/200 (6694s): Gen Loss: 0.587, Dis Loss (Real): 0.788, Dis Loss (Fake): 0.587  Gen Accuracy: 0.214  Dis Accuracy: 0.214  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.687

Epoch 9/200 (7530s): Gen Loss: 0.588, Dis Loss (Real): 0.787, Dis Loss (Fake): 0.588  Gen Accuracy: 0.134  Dis Accuracy: 0.134  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.687

Epoch 10/200 (8367s): Gen Loss: 0.589, Dis Loss (Real): 0.786, Dis Loss (Fake): 0.589  Gen Accuracy: 0.224  Dis Accuracy: 0.224  NLL: 3.406  NLL (Emo): 1.386  Dis Loss: 0.688

In [82]:
# Print model's state_dict
print("Model's state_dict:")
for param_tensor in gan.state_dict():
    print(param_tensor, "\t", gan.state_dict()[param_tensor].size())

Model's state_dict:
generator.embedding_notes.weight 	 torch.Size([245, 128])
generator.embedding_emotion.weight 	 torch.Size([4, 128])
generator.pos_encoder.pe 	 torch.Size([5000, 1, 256])
generator.linear.weight 	 torch.Size([256, 256])
generator.linear.bias 	 torch.Size([256])
generator.encoder.layers.0.self_attn.in_proj_weight 	 torch.Size([768, 256])
generator.encoder.layers.0.self_attn.in_proj_bias 	 torch.Size([768])
generator.encoder.layers.0.self_attn.out_proj.weight 	 torch.Size([256, 256])
generator.encoder.layers.0.self_attn.out_proj.bias 	 torch.Size([256])
generator.encoder.layers.0.linear1.weight 	 torch.Size([2048, 256])
generator.encoder.layers.0.linear1.bias 	 torch.Size([2048])
generator.encoder.layers.0.linear2.weight 	 torch.Size([256, 2048])
generator.encoder.layers.0.linear2.bias 	 torch.Size([256])
generator.encoder.layers.0.norm1.weight 	 torch.Size([256])
generator.encoder.layers.0.norm1.bias 	 torch.Size([256])
generator.encoder.layers.0.norm2.weight 	 torch.

In [83]:
torch.save(gan.state_dict(), './models/remi_transgan_label_noise_final.pt')

## Generate

In [84]:
gan = MidiTransGAN(generator, discriminator, noise_fn, batch_size=batch_size, device=device)
gan.load_state_dict(torch.load('./models/remi_transgan_label_noise_final.pt'))
gan.eval()

MidiTransGAN(
  (generator): Generator(
    (criterion): CrossEntropyLoss()
    (embedding_notes): Embedding(245, 128)
    (embedding_emotion): Embedding(4, 128)
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.4, inplace=False)
    )
    (linear): Linear(in_features=256, out_features=256, bias=True)
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): _LinearWithBias(in_features=256, out_features=256, bias=True)
          )
          (linear1): Linear(in_features=256, out_features=2048, bias=True)
          (dropout): Dropout(p=0.4, inplace=False)
          (linear2): Linear(in_features=2048, out_features=256, bias=True)
          (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.4, inplace=False)
          (dropout2): Dropout(p=0.4, i

In [85]:
# !pip install muspy
import muspy

In [89]:
ntokens

[245, 4]

In [92]:
# TODO: fix the generate sample function to handle batch size = 1
sequences = []

for k in range(3):
    for emo in range(0,4):
        n_generate = 4000
        temperature = 1
        log_interval = 4000 # interval between logs

        notes = []
        for token in ntokens[:-1]:
            # print(token)
            notes.append(torch.randint(token, (1, 2), dtype=torch.long).to(device))
        

        emotions = torch.full((1, 2), emo).to(device)
        
        notes.append(emotions)

        # stacked input
        inputs = torch.stack(notes, dim=-1)
        print(len(inputs))
            
        src_mask = generate_square_subsequent_mask(len(inputs)).to(device)

        output = gan.generate_samples(latent_vec=inputs, emotion=emo, num=n_generate, src_mask=None)

        print('| Generated {} notes'.format(n_generate))
        sequences.append([output[:,2:,0].squeeze().cpu().tolist()])

1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes
1
| Generated 4000 notes


In [94]:
q1 = [sequences[0], sequences[4], sequences[8]]
q2 = [sequences[1], sequences[5], sequences[9]]
q3 = [sequences[2], sequences[6], sequences[10]]
q4 = [sequences[3], sequences[7], sequences[11]]
collected = [q1, q2, q3, q4]

In [96]:
date = '18_04_'
pitch_ranges = []
n_pitches = []
polyphonies = []
empty_beat_rates = []

for k, sequence in enumerate(collected):
    
    i = 0
    for seq in (sequence):
        i = i + 1
        # TODO: remove this
        # seq = seq[0]

        converted_back_midi = remi_enc.tokens_to_midi(seq, get_midi_programs(midi))
        file_name = 'remi_transgan_label_noise_' + date  + str(k) + '_' + str(i) + '.mid'
        converted_back_midi.dump(file_name)
        music = muspy.read_midi(file_name)

        # music = muspy.read_midi(file_name)
        pitch_ranges.append(muspy.pitch_range(music))
        n_pitches.append(muspy.n_pitch_classes_used(music))
        polyphonies.append(muspy.polyphony(music)) # average number of pitches being played concurrently.
        empty_beat_rates.append(muspy.empty_beat_rate(music))

remi_transgan_label_noise_18_04_0_1.mid
remi_transgan_label_noise_18_04_0_2.mid
remi_transgan_label_noise_18_04_0_3.mid
remi_transgan_label_noise_18_04_1_1.mid
remi_transgan_label_noise_18_04_1_2.mid
remi_transgan_label_noise_18_04_1_3.mid
remi_transgan_label_noise_18_04_2_1.mid
remi_transgan_label_noise_18_04_2_2.mid
remi_transgan_label_noise_18_04_2_3.mid
remi_transgan_label_noise_18_04_3_1.mid
remi_transgan_label_noise_18_04_3_2.mid
remi_transgan_label_noise_18_04_3_3.mid


In [None]:
sequences = []
date = '29_03_'
pitch_ranges = []
n_pitches = []
polyphonies = []
empty_beat_rates = []

for k in range(25):
    print(k)
    for emo in range(1,5):
        n_generate = 4000
        temperature = 1
        sequence = []
        log_interval = 4000 # interval between logs
        input = torch.randint(218, (1, 2), dtype=torch.long).to(device)
        emotion = torch.zeros((1, 2), dtype=int).to(device)
        emotion[:,0] = emo


        src_mask = generate_square_subsequent_mask(len(input)).to(device)
        with open('./output', 'w') as outf:
            with torch.no_grad():  # no tracking history
                for i in range(n_generate):

                    output, _ = gan.generate_samples(latent_vec=input, emotion=emotion)

                    word_weights = output[-1].squeeze().div(temperature).exp().cpu()
                    word = torch.multinomial(word_weights, 1)[0].tolist()
                    word_tensor = torch.Tensor([word]).long().to(device)
                    
                    input = torch.cat([input, word_tensor], 1)
                    emotion = torch.cat([emotion, torch.zeros((1,1), dtype=int).to(device)], -1)

                    outf.write(str(word) + ('\n' if i % 20 == 19 else ' '))
                    
                    sequence.extend(word)

                    if i % log_interval == 0:
                        print('| Generated {}/{} notes'.format(i, n_generate))
        sequences.append([sequence])
        converted_back_midi = remi_enc.tokens_to_midi([sequence], get_midi_programs(midi))
        file_name = 'transgan_' + date + str(k) + '_' + str(emo) + '.mid'
        converted_back_midi.dump(file_name)

        music = muspy.read_midi(file_name)
        pitch_ranges.append(muspy.pitch_range(music))
        n_pitches.append(muspy.n_pitches_used(music))
        polyphonies.append(muspy.polyphony(music)) # average number of pitches being played concurrently.
        empty_beat_rates.append(muspy.empty_beat_rate(music))

In [None]:
results_transgan = {'Pitch_range': pitch_ranges, 'Num_pitches': n_pitches, 'Polyphony': polyphonies, 'Empty_beat_rates': empty_beat_rates}
results_df = pd.DataFrame(results_transgan)
results_df.to_csv('remi_ransgan_results_v2_emo_20_seq.csv')

In [None]:
converted_back_midi

ticks per beat: 384
max tick: 0
tempo changes: 1
time sig: 0
key sig: 0
markers: 0
lyrics: False
instruments: 1

## Metrics

### BLEU Score

In [None]:
train_check = train_data[:,:,0]
train_check.shape

torch.Size([48791, 21])

In [None]:
gen_check = []
for sequence in sequences:
    # print(sequence[0])
    for i in range(0, len(sequence[0])-21, 21):
        gen_check.append(sequence[0][i:i+21])

In [None]:
torch.Tensor(gen_check).shape

torch.Size([760, 21])

In [None]:
from nltk.translate.bleu_score import corpus_bleu

score = corpus_bleu([train_check], [torch.Tensor(gen_check)])


In [None]:
score

0

### MusPy metrics

In [None]:
results_df.describe()

Unnamed: 0,Pitch_range,Num_pitches,Polyphony,Empty_beat_rates
count,4.0,4.0,4.0,4.0
mean,86.5,37.75,6.349855,0.011236
std,1.0,3.593976,2.436483,0.022472
min,85.0,33.0,3.451274,0.0
25%,86.5,36.0,4.985313,0.0
50%,87.0,38.5,6.408194,0.0
75%,87.0,40.25,7.772735,0.011236
max,87.0,41.0,9.131757,0.044944


In [None]:
music = muspy.read_midi('conditioned_17_03_4.mid')
pitch_range = muspy.pitch_range(music)
n_pitches_used = muspy.n_pitches_used(music)
polyphony = muspy.polyphony(music) # average number of pitches being played concurrently.
empty_beat_rate = muspy.empty_beat_rate(music)

print("The pitch range is", pitch_range)
print("The number of unique pitches used is", n_pitches_used)
print("The polyphony is", polyphony)
print("The empty beat rate is", empty_beat_rate)