In [1]:
import tensorflow as tf
import numpy as np
import os

In [2]:
MAX_LENGTH = 300
NUM_WORDS = 1000

In [3]:
import glob
import pickle
import numpy
from music21 import converter, instrument, note, chord

In [4]:
def get_notes():
    """ Get all the notes and chords from the midi files in the ./midi_songs directory """
    notes = []
    songs = []

    for file in glob.glob("midi_songs/*.mid"):
        song = []
        midi = converter.parse(file)

        print("Parsing %s" % file)

        notes_to_parse = None

        try: # file has instrument parts
            s2 = instrument.partitionByInstrument(midi)
            notes_to_parse = s2.parts[0].recurse() 
        except: # file has notes in a flat structure
            notes_to_parse = midi.flat.notes

        for element in notes_to_parse:
            if isinstance(element, note.Note):
                song.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                song.append('.'.join(str(n) for n in element.normalOrder))
        songs.append(song)
        notes += song

    with open('data/notes', 'wb') as filepath:
        pickle.dump(notes, filepath)

    return notes, songs


In [5]:
def prepare_sequences(notes, n_vocab):
    """ Prepare the sequences used by the Neural Network """
    sequence_length = 100

    # get all pitch names
    pitchnames = sorted(set(item for item in notes))

     # create a dictionary to map pitches to integers
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))

    network_input = []
    network_output = []

    # create input sequences and the corresponding outputs
    for i in range(0, len(notes) - sequence_length, 1):
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        network_output.append(note_to_int[sequence_out])

    n_patterns = len(network_input)

    # reshape the input into a format compatible with LSTM layers
    network_input = numpy.reshape(network_input, (n_patterns, sequence_length, 1))
    # normalize input
    # network_input = network_input / float(n_vocab)

    # network_input = np_utils.to_categorical(network_input)
    network_output = np_utils.to_categorical(network_output)

    return (network_input, network_output)

In [6]:
notes, songs = get_notes()
num_notes = len(notes)

Parsing midi_songs/bwv782.mid
Parsing midi_songs/bwv783.mid
Parsing midi_songs/bwv781.mid
Parsing midi_songs/bwv780.mid
Parsing midi_songs/bwv784.mid
Parsing midi_songs/bwv785.mid
Parsing midi_songs/bwv778.mid
Parsing midi_songs/bwv786.mid
Parsing midi_songs/bwv779.mid
Parsing midi_songs/bwv774.mid
Parsing midi_songs/bwv775.mid
Parsing midi_songs/bwv777.mid
Parsing midi_songs/bwv776.mid
Parsing midi_songs/bwv772.mid
Parsing midi_songs/bwv773.mid


In [7]:
songs_text = [' '.join(song) for song in songs]

tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=NUM_WORDS,
                                                  filters='',
                                                  lower=False)
tokenizer.fit_on_texts(songs_text)
note2code = tokenizer.word_index

songs_codes = tokenizer.texts_to_sequences(songs_text)
padded_songs = tf.keras.preprocessing.sequence.pad_sequences(songs_codes,
                                                             maxlen=MAX_LENGTH)

In [8]:
temp = np.zeros((padded_songs.shape[0], MAX_LENGTH, NUM_WORDS))
temp[np.expand_dims(np.arange(padded_songs.shape[0]), axis=0).reshape(padded_songs.shape[0], 1),
     np.repeat(np.array([np.arange(MAX_LENGTH)]), padded_songs.shape[0], axis=0),
     padded_songs] = 1

songs_one_hot = temp

In [9]:
train_dataset = tf.data.Dataset.from_tensor_slices(padded_songs).batch(4)

In [10]:
class Sampling(tf.keras.layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, epsilon_std=1.0):
        super(Sampling, self).__init__()
        self.epsilon_std = epsilon_std

    def call(self, inputs):
        z_mean, z_log_var = inputs
        epsilon = tf.random.normal(shape=tf.shape(z_mean), mean=0.0, stddev=self.epsilon_std)
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [11]:
class Encoder(tf.keras.layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self,
                 vocab_size=100,
                 embed_dim=32,
                 latent_rep_size=32,
                 max_length=300,
                 epsilon_std=0.01,
                 name='encoder',
                 **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embed_dim, input_length=max_length)
        self.lstm_1 = tf.keras.layers.LSTM(500, return_sequences=True, name='lstm_1')
        self.dense_mean = tf.keras.layers.Dense(latent_rep_size)
        self.dense_log_var = tf.keras.layers.Dense(latent_rep_size)
        self.sampling = Sampling(epsilon_std)

    def call(self, inputs):
        x0 = self.embedding(inputs)
        x1 = self.lstm_1(x0)
        z_mean = self.dense_mean(x1)
        z_log_var = self.dense_log_var(x1)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(tf.keras.layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self,
                 vocab_size,
                 max_length,
                 name='decoder',
                 **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dec_lstm_1 = tf.keras.layers.LSTM(500, return_sequences=True, name='dec_lstm_1')
        self.decoded_mean = tf.keras.layers.TimeDistributed(
                                tf.keras.layers.Dense(vocab_size, activation='softmax'),
                                name='decoded_mean')

    def call(self, inputs):
        x1 = self.dec_lstm_1(inputs)
        x2 = self.decoded_mean(x1)
        return x2


class VariationalAutoEncoder(tf.keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(self,
                 vocab_size=500,
                 embed_dim=32,
                 max_length=300,
                 latent_rep_size=200,
                 name='autoencoder',
                 **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.encoder = Encoder(vocab_size, embed_dim, latent_rep_size, max_length)
        self.decoder = Decoder(vocab_size, max_length)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = - 0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(lambda: kl_loss)
        return reconstructed

In [12]:
vae = VariationalAutoEncoder()

In [13]:
def create_model_checkpoint(dir, model_name):
    filepath = dir + '/' + \
               model_name + "-{epoch:02d}.h5"
    directory = os.path.dirname(filepath)

    try:
        os.stat(directory)
    except:
        os.mkdir(directory)

    checkpointer = tf.keras.callbacks.ModelCheckpoint(filepath=filepath,
                                                      verbose=1,
                                                      save_best_only=False)

    return checkpointer

In [14]:
def vae_loss(x, x_decoded_mean):
    xent_loss = tf.keras.backend.sparse_categorical_crossentropy(tf.cast(x, tf.float32), x_decoded_mean)
    return xent_loss


optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

loss_metric = tf.keras.metrics.Mean()

# Iterate over epochs.
for epoch in range(10):
    print('Start of epoch %d' % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = vae_loss(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print('step %s: mean loss = %s' % (step, loss_metric.result()))

Start of epoch 0
step 0: mean loss = tf.Tensor(6.214658, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(5.950363, shape=(), dtype=float32)
Start of epoch 2
step 0: mean loss = tf.Tensor(5.6667404, shape=(), dtype=float32)
Start of epoch 3
step 0: mean loss = tf.Tensor(5.480558, shape=(), dtype=float32)
Start of epoch 4
step 0: mean loss = tf.Tensor(5.372519, shape=(), dtype=float32)
Start of epoch 5
step 0: mean loss = tf.Tensor(5.3008075, shape=(), dtype=float32)
Start of epoch 6
step 0: mean loss = tf.Tensor(5.2496033, shape=(), dtype=float32)
Start of epoch 7
step 0: mean loss = tf.Tensor(5.211098, shape=(), dtype=float32)
Start of epoch 8
step 0: mean loss = tf.Tensor(5.1814137, shape=(), dtype=float32)
Start of epoch 9
step 0: mean loss = tf.Tensor(5.157892, shape=(), dtype=float32)


In [15]:
prediction_output = vae.predict(padded_songs[np.newaxis, 0])

In [16]:
prediction_indices = np.argmax(prediction_output, axis=2)
code2note = dict([[code, note] for note, code in note2code.items()])

prediction_song = [code2note[index] for index in prediction_indices[0]]
print(prediction_song)

['A4', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5', 'D5

In [17]:
from music21 import instrument, note, stream, chord

def create_midi(prediction_output, file_path):
    """ convert the output from the prediction to notes and create a midi file
        from the notes """
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for pattern in prediction_output:
        # pattern is a chord
        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)

        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp=file_path)

In [18]:
create_midi(prediction_song, 'test_vae_out.midi')