##Installs, setup, etc.

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [2]:
#assumes we already have tensorflow installed (e.g. on colab)
%%capture
!sudo apt install -y fluidsynth
!pip install --upgrade pyfluidsynth
!pip install muspy #important to have this last, 
                   #otherwise it interferes with fluidsynth version

In [3]:
import tensorflow as tf
import glob
import numpy as np
import torchtext
import muspy
import fluidsynth
import os
import music21
import pandas as pd
import random
import keras.utils
import time
import csv
import shutil
from os import path
from IPython import display
from more_itertools import sliced
from keras.utils.vis_utils import plot_model
from matplotlib import pyplot as plt
from cgi import test

In [None]:
muspy.download_musescore_soundfont()

## constants/hyperparamters

In [5]:
#options
dataset_choice = 'MAESTRO' #JSB or MAESTRO

#constants
NO_FILES = 7
SAMPLING_RATE = 16000
VOCAB_SIZE = 355  #356 for event no velocity, 388 for event w/ velocity, 

#hyperparameters 
nn_size_options = ['small', 'medium', 'large']
nn_size_default = nn_size_options[1]
NN_SIZE = nn_size_default

lr_options = [0.01, 0.001, 0.0001]
lr_default = lr_options[1]
LEARNING_RATE = lr_default

dropout_options = [0.0, 0.3, 0.4 , 0.5]
dropout_default = dropout_options[0]
DROPOUT = dropout_options[2]

batch_options = [32, 64, 128]
batch_default = batch_options[1]
BATCH_SIZE = batch_options[2]

seq_len_options = [25, 50, 100]
seq_len_default = seq_len_options[1]
SEQ_LENGTH = seq_len_default

recurrent_dropout_options = [0.0, 0.3, 0.4, 0.5] #where 0.0 means disabled
recurrent_dropout_default = recurrent_dropout_options[0]
RECURRENT_DROPOUT = recurrent_dropout_default

using_batch_norm = [False, True]
batch_norm_default = using_batch_norm[0]
BATCH_NORM = using_batch_norm[1]


##get datasets

In [6]:
if dataset_choice == 'JSB':
  #download dataset (JSB)
  files = []

  if not (path.exists('/content/JSB%20Chorales.zip')):
    url = 'http://www-ens.iro.umontreal.ca/~boulanni/JSB%20Chorales.zip'
    from_path = './JSB%20Chorales.zip'
    to_path = './'

    torchtext.utils.download_from_url(url, from_path)
    torchtext.utils.extract_archive(from_path, to_path)
  else: print('Already downloaded the JSB Chorales dataset.')

  train_files = glob.glob('JSB Chorales/train/*.mid*') #create list of all .midi files
  valid_files = glob.glob('JSB Chorales/valid/*.mid*') #create list of all .midi files
  test_files = glob.glob('JSB Chorales/test/*.mid*') #create list of all .midi files

In [None]:
if dataset_choice == 'MAESTRO':

  #download dataset (MAESTRO)
  files = []

  if not (path.exists('/content/maestro-v3.0.0')):
    url = 'https://storage.googleapis.com/magentadata/datasets/maestro/v3.0.0/maestro-v3.0.0-midi.zip'
    from_path = './maestro-v3.0.0-midi.zip'
    to_path = './'

    torchtext.utils.download_from_url(url, from_path)
    torchtext.utils.extract_archive(from_path, to_path)
  else: print('Already downloaded the MAESTRO V3 dataset.')

  csv = '/content/maestro-v3.0.0/maestro-v3.0.0.csv'
  col_list = ["split", "midi_filename"]
  df = pd.read_csv(csv, usecols=col_list)

  num_files = len(df.index)

  test_files = []
  valid_files = []
  train_files = []

  for i in range(num_files):

    if df['split'][i] == 'test':    
      filename = df['midi_filename'][i]
      #sliced_filename = filename[5:]
      test_files.append('/content/maestro-v3.0.0/' + filename)

    if df['split'][i] == 'validation':    
      filename = df['midi_filename'][i]
      #sliced_filename = filename[5:]
      valid_files.append('/content/maestro-v3.0.0/' + filename)

    if df['split'][i] == 'train':    
      filename = df['midi_filename'][i]
      #sliced_filename = filename[5:]
      train_files.append('/content/maestro-v3.0.0/' + filename)


##MIDI processing

In [8]:
midi_file = train_files[0]
music = muspy.read_midi(midi_file)
RESOLUTION = music.resolution #get resolution for MIDI creation

In [9]:
def tensor_to_muspy(music_tensor) -> muspy.Music:

  music_array = denormalise(music_tensor)
  music_array = np.around(music_array)
  music_array = music_array.astype(int)

  music_array = append_note_off(music_array)
  music = muspy.from_event_representation(music_array, resolution=RESOLUTION)
  return music

In [10]:
def append_note_off(event_array):
  #convert from [1, 2, 3] to [[1], [2], [3]] for muspy compatability
  event_array = [[i] for i in event_array]
  note_offs = np.arange(128, 256)
  note_offs = [[i] for i in note_offs]
  eos_array = np.concatenate((event_array, note_offs))
  return eos_array

In [11]:
def muspy_to_audio(music: muspy.Music):
  raw_audio = muspy.synthesize(music, rate=SAMPLING_RATE)
  time = 30 #max length of excerpt
  excerpt = raw_audio[:time*SAMPLING_RATE]
  return display.Audio(excerpt.T, rate=SAMPLING_RATE)

In [12]:
def normalise(music: np.ndarray) -> np.ndarray:
    return music/VOCAB_SIZE
    
def denormalise(music: np.ndarray)-> np.ndarray:
    return music*VOCAB_SIZE

##Datasets

 creating the actual dataset of arrays, from midi files

In [34]:
train_dataset = []
validate_dataset = []
test_dataset = []

metrics_set = [] #used to get average metrics for comparison later

total_notes = 0

random.shuffle(train_files)
random.shuffle(valid_files)
random.shuffle(test_files)

splits = [0.8, 0.1, 0.1] #values here should add to 1


if dataset_choice == 'MAESTRO':
  train_files = train_files[:round(NO_FILES*splits[0])]
  valid_files = valid_files[:round(NO_FILES*splits[1])]
  test_files = test_files[:round(NO_FILES*splits[2])]

for file in train_files:
  midi_file = muspy.read_midi(file)
  metrics_set.append(midi_file)
  train_dataset.append(muspy.to_event_representation(midi_file, use_single_note_off_event=False))
  total_notes += muspy.to_event_representation(midi_file, use_single_note_off_event=False).size
  
for file in valid_files: 
  midi_file = muspy.read_midi(file)
  metrics_set.append(midi_file)
  validate_dataset.append(muspy.to_event_representation(midi_file, use_single_note_off_event=False))
  total_notes += muspy.to_event_representation(midi_file, use_single_note_off_event=False).size

for file in test_files:
  midi_file = muspy.read_midi(file)
  metrics_set.append(midi_file)
  test_dataset.append(muspy.to_event_representation(midi_file, use_single_note_off_event=False))
  total_notes += muspy.to_event_representation(midi_file, use_single_note_off_event=False).size

In [35]:
def convert_array(array_ds):
  array_ds = np.concatenate(array_ds).ravel() #turns tuple of numpy arrays into a flat 1d numpy array   
  return array_ds

def create_features(array_ds):
  array_ds = tf.convert_to_tensor(array_ds, dtype=tf.float32)
  array_ds = tf.data.Dataset.from_tensor_slices(array_ds)
  array_ds = array_ds.batch(1)

  array_ds = array_ds.window(SEQ_LENGTH+1, shift=1, stride=1, drop_remainder=True)
  array_ds = array_ds.flat_map(lambda x: x.batch(SEQ_LENGTH+1, drop_remainder=True))

  return array_ds

def create_target(array_ds):
  test_ds = array_ds[:-1]
  test_ds = normalise(test_ds)

  target = array_ds[-1]
  target = tf.cast(target, tf.int32)
  target = tf.one_hot(target, VOCAB_SIZE+1)
  target = tf.squeeze(target)

  return test_ds, target

In [36]:
train_dataset = convert_array(train_dataset) #flat array
validate_dataset = convert_array(validate_dataset)
test_dataset = convert_array(test_dataset)

In [37]:
train_dataset = create_features(train_dataset)   #groups of seq length tensors
train_dataset = train_dataset.map(create_target) #groups of seq length tensors, 
                                                 #with 1 target note (in one-hot format)
train_dataset = (train_dataset.shuffle(total_notes+1).batch(BATCH_SIZE, drop_remainder=True))

In [38]:
validate_dataset = create_features(validate_dataset)
validate_dataset = validate_dataset.map(create_target)
validate_dataset = (validate_dataset.shuffle(total_notes+1).batch(BATCH_SIZE, drop_remainder=True))

In [39]:
test_dataset = create_features(test_dataset)
test_dataset = test_dataset.map(create_target)
test_dataset = (test_dataset.shuffle(total_notes+1).batch(BATCH_SIZE, drop_remainder=True))

##Create lstm
This section defines a Neural Network

In [19]:
def build_model():
  if NN_SIZE == 'small':
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.LSTM(
        128,
        input_shape=(SEQ_LENGTH, 1),
        return_sequences=False
    ))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(128))
    model.add(tf.keras.layers.Activation('relu'))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(VOCAB_SIZE+1))
    model.add(tf.keras.layers.Softmax())

    loss = tf.keras.losses.CategoricalCrossentropy()
    optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(loss=loss, optimizer=optimizer) 

  elif NN_SIZE == 'medium':
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.LSTM(
        256,
        input_shape=(SEQ_LENGTH, 1),
        recurrent_dropout=RECURRENT_DROPOUT,
        return_sequences=True
    ))
    model.add(tf.keras.layers.LSTM(
        256, 
        return_sequences=False,
    ))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(128))
    model.add(tf.keras.layers.Activation('relu'))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(VOCAB_SIZE+1))
    model.add(tf.keras.layers.Softmax())

    loss = tf.keras.losses.CategoricalCrossentropy()
    optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(loss=loss, optimizer=optimizer) 
    
  elif NN_SIZE == 'large':
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.LSTM(
        512,
        input_shape=(SEQ_LENGTH, 1),
        recurrent_dropout=RECURRENT_DROPOUT,
        return_sequences=True
    ))
    model.add(tf.keras.layers.LSTM(
        512, 
        return_sequences=True,
    ))
    model.add(tf.keras.layers.LSTM(
        512, 
        return_sequences=False,
    ))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(256))
    model.add(tf.keras.layers.Activation('relu'))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(256))
    model.add(tf.keras.layers.Activation('relu'))
    if BATCH_NORM == True:
      model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    
    model.add(tf.keras.layers.Dropout(DROPOUT))
    model.add(tf.keras.layers.Dense(VOCAB_SIZE+1))
    model.add(tf.keras.layers.Softmax())

    loss = tf.keras.losses.CategoricalCrossentropy()
    optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
    model.compile(loss=loss, optimizer=optimizer)

  return model

##print

In [None]:
print('model size: ', NN_SIZE, ' dropout: ', DROPOUT, ' learning rate: ', LEARNING_RATE, ' sequence length: ', SEQ_LENGTH, ' batch size: ', BATCH_SIZE, ' recurrent dropout: ', RECURRENT_DROPOUT, ' batch norm: ', BATCH_NORM)

##training

In [21]:
class TimeHistory(keras.callbacks.Callback): 
#from https://stackoverflow.com/questions/43178668/record-the-computation-time-for-each-epoch-in-keras-during-model-fit
    def on_train_begin(self, logs={}):
        self.times = []

    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()

    def on_epoch_end(self, batch, logs={}):
        self.times.append(time.time() - self.epoch_time_start)

In [22]:
time_callback = TimeHistory()

In [None]:
filepath = '/content/drive/MyDrive/Checkpoints/ckpt{epoch:02d}.hdf5'

callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath=filepath,
        #save_best_only=True,
        save_freq='epoch',
        save_weights_only=True,
        period=50),
        
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        restore_best_weights=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=8,
        restore_best_weights=False),
    time_callback,
]

In [None]:
val_losses = []
epoch_recorded = []

num_runs = 1

for i in range(num_runs):
  model = build_model()
  history = model.fit(
      train_dataset,
      epochs=100,
      callbacks=callbacks,
      validation_data=validate_dataset
  )
  lowest = min(history.history['val_loss'])
  val_losses.append(lowest)

  epoch_idx = history.history['val_loss'].index(lowest) + 1
  epoch_recorded.append(epoch_idx)

avg_val_loss = round(sum(val_losses)/num_runs, 4) 
times = time_callback.times
avg_times = round(sum(times)/len(times))
avg_epoch_recorded = round(sum(epoch_recorded)/len(epoch_recorded))

print('average val_loss: ', avg_val_loss)
print('average time per epoch: ', avg_times)
print('average best epoch_recorded: ', avg_epoch_recorded)

In [None]:
dic1 = {'average val_loss: ': [avg_val_loss], 'average time per epoch: ': [avg_times], 'average best epoch_recorded: ': [avg_epoch_recorded]}
df = pd.DataFrame(dic1)

In [None]:
#model.summary()
#plot_model(model, to_file='model.png')

In [None]:
#model.load_weights('/content/drive/MyDrive/Checkpoints/jsb300/ckpt300.hdf5')

In [None]:
#model.save(filepath)

##Generate music

In [28]:
def get_random_sequence():
  rand = random.randrange(len(test_files))
  music = muspy.read_midi(test_files[rand])
  music = muspy.to_event_representation(music)
  rand = random.randrange((len(music) - SEQ_LENGTH) - 1)
  music = music[rand:SEQ_LENGTH+rand]
  #music = music[0:SEQ_LENGTH] #enable to test from beginning of piece 
  music = normalise(music)

  return music

In [31]:
def predict(music, model):
    input_seq = tf.expand_dims(music, 0)
    softmax_output = model.predict(input_seq)
    events = np.squeeze(softmax_output)
    event = np.argmax(events)
    return event

In [None]:
songs = 10 #how many pieces we want to generate
muspy_songs = []

for i in range(songs):

  music = get_random_sequence()

  no_preds = 400 #how many generated notes we want (400 is about 25 seconds on JSB chorales)
  generated = []

  for i in range (no_preds):                       
    note = predict(music, model)
    generated.append(normalise(note))
    music = np.append(music, normalise(note))
    music = music[1:]

  #print(generated)
  generated = np.asarray(generated)
  music = tensor_to_muspy(generated)
  muspy_songs.append(music)

In [None]:
#muspy_to_audio(muspy_songs[0])

In [None]:
#muspy.write_audio('/content/drive/MyDrive/Generated/300epochs.wav', muspy_songs[0], audio_format='wav', rate=SAMPLING_RATE)

In [None]:
muspy.show_pianoroll(muspy_songs[0])

##Metrics
This section defines metrics which can be used in order to evaluate the model.

In [None]:
def get_pitch_metrics(muspy_songs): 
  #returns a tuple of number of different pitch values per generated song, 
  #with last element the mean of the whole set

  songs = []
  total_pitches_used = 0

  for song in muspy_songs:
    song_pitches = muspy.n_pitches_used(song)
    songs.append(song_pitches)
    total_pitches_used += muspy.n_pitches_used(song)
  
  mean = total_pitches_used/len(songs)
  songs.append(mean)

  return songs

In [None]:
def get_polyphony_rates(muspy_songs): 
  #returns a tuple of polyphony values (how many notes played at once) per generated song, 
  #with last element the mean of the whole set
  songs = []
  total_polyphony = 0

  for song in muspy_songs:
    song_polyphony = muspy.polyphony(song)
    songs.append(song_polyphony)
    total_polyphony += muspy.polyphony(song)
  
  mean = total_polyphony/len(songs)
  songs.append(mean)

  return songs

In [None]:
dataset_generated_song_pitches = get_pitch_metrics(metrics_set)
dataset_generated_mean_song_pitches = dataset_generated_song_pitches[-1]
dataset_generated_song_pitches = dataset_generated_song_pitches[:-1]

dataset_generated_polyphony_rates = get_polyphony_rates(metrics_set)
dataset_generated_mean_polyphony_rates = dataset_generated_polyphony_rates[-1]
dataset_generated_polyphony_rates = dataset_generated_polyphony_rates[:-1]

In [None]:
generated_song_pitches = get_pitch_metrics(muspy_songs)
generated_mean_song_pitches = generated_song_pitches[-1]
generated_song_pitches = generated_song_pitches[:-1]

generated_polyphony_rates = get_polyphony_rates(muspy_songs)
generated_mean_polyphony_rates = generated_polyphony_rates[-1]
generated_polyphony_rates = generated_polyphony_rates[:-1]

In [None]:
dic2 = {'average pitches': [generated_mean_song_pitches], 'average polyphony': [generated_mean_polyphony_rates]}

In [None]:
dicall = {}
dicall.update(dic1)
dicall.update(dic2)

##final results

the final results of an experiment are saved to a folder, as well as a loss graph from one of the models as a sample

In [None]:
df = pd.DataFrame(dicall)

In [None]:
filename = NN_SIZE + '_d' + str(DROPOUT) + '_lr'  + str(LEARNING_RATE) + '_sl'  + str(SEQ_LENGTH) + '_bs'  + str(BATCH_SIZE) + '_rd'  + str(RECURRENT_DROPOUT) + '_bn'  + str(BATCH_NORM)

if filename == 'medium_d0.0_lr0.001_sl50_bs64_rd0.0_bnFalse':
  filename = 'defaults'
folder = '/content/drive/MyDrive/Experiments/' + filename
if not (path.exists(folder)):
  os.mkdir(folder)
dffile = folder + '/' + 'statistics.csv'
df.to_csv(dffile) 

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.draw()

imagefile = folder + '/' + 'graph.png'
plt.savefig(imagefile)
plt.show()