In [126]:
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers, Model
from music21 import *
import os
import pickle
import IPython.display as display

In [11]:
def get_midi():
    notes = []
    durations = []
    path = '../../dataset/rag/'
    for file in os.listdir(path):
        mf = converter.parse(path+file)
        
        print(f"Parsing {file}...")
        
        notes_to_parse = None
        
        try:
            s = instrument.partitionByInstrument(mf)
            notes_to_parse = mf.recurse()
        except:
            notes_to_parse = mf.flat.notesAndRests
            
        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
                durations.append(str(element.duration.quarterLength))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder))
                durations.append(str(element.duration.quarterLength))
            elif isinstance(element, note.Rest):
                notes.append(str(element.name))
                durations.append(str(element.duration.quarterLength))
                
    with open('./data/notes', 'wb') as filepath:
        pickle.dump(notes, filepath)
    
    with open('./data/durations', 'wb') as filepath:
        pickle.dump(durations, filepath)
    
    return notes, durations

In [79]:
def preprocess_sequence(notes, durations, n_vocab_notes, n_vocab_durations):
    sequence_length = 32
    
    pitchnames = sorted(set(notes))
    unique_durations = sorted(set(durations))
    
    note_to_int = dict((note, idx) for idx, note in enumerate(pitchnames))
    duration_to_int = dict((duration, idx) for idx, duration in enumerate(unique_durations))
    
    notes_input = []
    durations_input = []
    
    notes_output = []
    durations_output = []
    
    for i in range(len(notes) - sequence_length):
        note_sequence_in = notes[i:i+sequence_length]
        note_sequence_out = notes[i+sequence_length]
        duration_sequence_in = durations[i:i+sequence_length]
        duration_sequence_out = durations[i+sequence_length]
        
        notes_input.append([note_to_int[c] for c in note_sequence_in])
        notes_output.append(note_to_int[note_sequence_out])
        
        durations_input.append([duration_to_int[c] for c in duration_sequence_in])
        durations_output.append(duration_to_int[duration_sequence_out])
        
    n_samples = len(notes_input)
    
    # Reshape input and output to fit into network
    notes_input = np.reshape(notes_input, (n_samples, sequence_length, 1))
    notes_output = keras.utils.to_categorical(notes_output)
    
    durations_input = np.reshape(durations_input, (n_samples, sequence_length, 1))
    durations_output = keras.utils.to_categorical(durations_output)
    
    # Normalize input
    notes_input = notes_input / float(n_vocab_notes)
    durations_input = durations_input / float(n_vocab_durations)
    
    return notes_input, notes_output, durations_input, durations_output

In [80]:
def create_model_functional_Reg(notes, durations, n_notes, n_durations):
    note_input = layers.Input(shape=notes.shape[1:])
    duration_input = layers.Input(shape=durations.shape[1:])
    
    mergeLayer = layers.Concatenate(axis=-1)([note_input, duration_input])
    
    lstm1 = layers.LSTM(512, return_sequences=True)(mergeLayer)  # pass all activations to the next LSTM layer
    lstm1 = layers.Dropout(0.3)(lstm1)
    lstm2 = layers.LSTM(512, return_sequences=False)(lstm1)  # only return one prediction
    lstm2 = layers.Dropout(0.3)(lstm2)
    
    dense1 = layers.Dense(256)(lstm2)
    dense1 = layers.Dropout(0.3)(dense1)
    
    note_output = layers.Dense(n_notes, activation='softmax', name='notes')(dense1)
    duration_output = layers.Dense(n_durations, activation='softmax', name='durations')(dense1)
    
    model = Model(inputs=[note_input, duration_input], outputs=[note_output, duration_output])
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    
    return model

In [107]:
def create_model_functional(notes, durations, n_notes, n_durations):
    note_input = layers.Input(shape=notes.shape[1:])
    duration_input = layers.Input(shape=durations.shape[1:])
    
    mergeLayer = layers.Concatenate(axis=-1)([note_input, duration_input])
    
    bilstm1 = layers.Bidirectional(layers.LSTM(512, return_sequences=True))(mergeLayer)
    bilstm1 = layers.Dropout(0.3)(bilstm1)
    
    bilstm2 = layers.Bidirectional(layers.LSTM(512, return_sequences=True))(bilstm1)
    bilstm2 = layers.Dropout(0.3)(bilstm2)
    
    lstm2 = layers.LSTM(512)(bilstm2)
    lstm2 = layers.Dropout(0.3)(lstm2)
    
    note_output = layers.Dense(n_notes, activation='softmax', name='notes')(lstm2)
    duration_output = layers.Dense(n_durations, activation='softmax', name='durations')(lstm2)
    
    model = Model(inputs=[note_input, duration_input], outputs=[note_output, duration_output])
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    
    return model

In [108]:
def create_model_sequential(notes, n_vocab):
    model = keras.Sequential()
    
    model.add(layers.LSTM(512, return_sequences=True, input_shape=notes.shape[1:]))
    model.add(layers.Dropout(0.3))
    model.add(layers.LSTM(512, return_sequences=True))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(256))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(n_vocab))
    model.add(layers.Activation('softmax'))
    
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    
    return model

In [109]:
def train_network():
    notes, durations = get_midi()
    
    # Number of unique notes
    n_vocab_notes = len(set(notes))
    n_vocab_durations = len(set(durations))
    
    notes_input, notes_output, durations_input, durations_output = preprocess_sequence(notes, durations, n_vocab_notes, n_vocab_durations)
    
    model = create_model_functional(notes_input, durations_input, n_vocab_notes, n_vocab_durations)
    model.summary()
    train(model, [notes_input, durations_input], [notes_output, durations_output])
    

In [110]:
def train(model, inputs, outputs):
    notes_input, durations_input = inputs
    notes_output, durations_output = outputs
    
    filepath = './weights_bilstm_bilstm_lstm/weights-{epoch:02d}-{loss:.4f}.hdf5'
    checkpoint = keras.callbacks.ModelCheckpoint(filepath, save_freq=2130, monitor='loss', save_best_only=True, mode='min', verbose=1)
    
    callbacks_list = [checkpoint]
    
    model.fit(inputs, outputs, epochs=70, batch_size=64, callbacks=callbacks_list, verbose=1)

In [111]:
train_network()

Parsing bethena.mid...
Parsing entertainer.mid...
Parsing magnetic.mid...
Parsing maple.mid...
Parsing original.mid...
Parsing peacherine.mid...
Parsing search.mid...
Parsing sugar-cane.mid...
Parsing sun-flower-slow-drag.mid...
Parsing winners.mid...
Model: "model_13"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_37 (InputLayer)           [(None, 32, 1)]      0                                            
__________________________________________________________________________________________________
input_38 (InputLayer)           [(None, 32, 1)]      0                                            
__________________________________________________________________________________________________
concatenate_18 (Concatenate)    (None, 32, 2)        0           input_37[0][0]                   
                                     

Epoch 26/70
Epoch 27/70
Epoch 28/70
Epoch 29/70
Epoch 30/70
Epoch 00030: loss improved from 2.96224 to 1.54210, saving model to ./weights_bilstm_bilstm_lstm/weights-30-1.5421.hdf5
Epoch 31/70
Epoch 32/70
Epoch 33/70
Epoch 34/70
Epoch 35/70
Epoch 36/70
Epoch 37/70
Epoch 38/70
Epoch 39/70
Epoch 40/70
Epoch 00040: loss improved from 1.54210 to 0.84176, saving model to ./weights_bilstm_bilstm_lstm/weights-40-0.8418.hdf5
Epoch 41/70
Epoch 42/70
Epoch 43/70
Epoch 44/70
Epoch 45/70
Epoch 46/70
Epoch 47/70
Epoch 48/70
Epoch 49/70
Epoch 50/70
Epoch 00050: loss improved from 0.84176 to 0.49629, saving model to ./weights_bilstm_bilstm_lstm/weights-50-0.4963.hdf5
Epoch 51/70
Epoch 52/70
Epoch 53/70
Epoch 54/70
Epoch 55/70
Epoch 56/70
Epoch 57/70
Epoch 58/70
Epoch 59/70
Epoch 60/70
Epoch 00060: loss improved from 0.49629 to 0.34475, saving model to ./weights_bilstm_bilstm_lstm/weights-60-0.3447.hdf5
Epoch 61/70
Epoch 62/70


Epoch 63/70
Epoch 64/70
Epoch 65/70
Epoch 66/70
Epoch 67/70
Epoch 68/70
Epoch 69/70
Epoch 70/70
Epoch 00070: loss improved from 0.34475 to 0.26072, saving model to ./weights_bilstm_bilstm_lstm/weights-70-0.2607.hdf5


In [170]:
def get_weights():
    filepath = "./weights_bilstm_bilstm_lstm/"
    weight_path = sorted([filepath+f for f in os.listdir(filepath)])
    return weight_path[-1]

In [171]:
def prepare_predict_sequences(notes, durations, pitchnames, unique_durations, n_notes, n_durations):
    note_to_int = dict((note, ix) for ix, note in enumerate(pitchnames))
    duration_to_int = dict((dur, ix) for ix, dur in enumerate(unique_durations))
    
    sequence_length = 32
    notes_input = []
    durations_input = []
    
    for i in range(len(notes) - sequence_length):
        note_sequence_in = notes[i:i+sequence_length]
        duration_sequence_in = durations[i:i+sequence_length]
        notes_input.append([note_to_int[c] for c in note_sequence_in])
        durations_input.append([duration_to_int[c] for c in duration_sequence_in])
        
    n_samples = len(notes_input)
    
    normalized_notes = np.reshape(notes_input, (n_samples, sequence_length, 1))
    normalized_notes = normalized_notes / float(n_notes)
    
    normalized_durations = np.reshape(durations_input, (n_samples, sequence_length, 1))
    normalized_durations = normalized_durations / float(n_durations)
    
    return notes_input, normalized_notes, durations_input, normalized_durations

In [172]:
def generate_notes(model, notes, durations, pitchnames, unique_durations):
    int_to_note = dict((ix, note) for ix, note in enumerate(pitchnames))
    int_to_duration = dict((ix, dur) for ix, dur in enumerate(unique_durations))
    
    start_note = np.random.randint(0, len(notes) - 1)
    start_duration = np.random.randint(0, len(durations) - 1)
    
    n_notes = len(pitchnames)
    n_durations = len(unique_durations)
    
    pattern_note = notes[start_note]
    pattern_duration = durations[start_duration]
    
    note_output = []
    duration_output = []
    
    for note_index in range(500):
        notes_input = np.reshape(pattern_note, (1, len(pattern_note), 1))
        notes_input = notes_input / n_notes
        
        durations_input = np.reshape(pattern_duration, (1, len(pattern_duration), 1))
        durations_input = durations_input / n_durations
        
        pred_note, pred_duration = model.predict([notes_input, durations_input], verbose=0)
        
        index_note = np.argmax(pred_note)
        index_duration = np.argmax(pred_duration)
        
        note_result = int_to_note[index_note]
        duration_result = int_to_duration[index_duration]
        
        note_output.append(note_result)
        duration_output.append(duration_result)
        
        pattern_note.append(index_note)
        pattern_duration.append(index_duration)
        
        pattern_note = pattern_note[1:len(pattern_note)]
        pattern_duration = pattern_duration[1:len(pattern_duration)]
        
    return note_output, duration_output

In [173]:
def check_duration(duration):
    try:
        duration = float(duration)
    except:
        s = duration.split('/')
        num = s[0]
        denom = s[1]
        duration = float(num) / float(denom)
        
    return duration

In [187]:
def create_midi(prediction_output):
    notes_pred, durations_pred = prediction_output
    
    offset = 0
    output_notes = []
    
    for idx, pattern in enumerate(notes_pred):
        # if pattern is a chord
        if '.' in pattern or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for chord_note in notes_in_chord:
                new_note = note.Note(int(chord_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        
        # if pattern is a rest
        elif 'rest' in pattern:
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = instrument.Piano()
            output_notes.append(new_rest)
            
        # if pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.storedInstrument = instrument.Piano()
            new_note.offset = offset
            output_notes.append(new_note)
            
        offset += check_duration(durations_pred[idx])
        
    midi_stream = stream.Stream(output_notes)
    
    midi_stream.write('midi', fp='output_midi_2bilstm_lstm_60_5.mid')

In [188]:
# Sample the progress using predictions to create simple musical pieces
def predict():
    with open('data/notes', 'rb') as filepath:
        notes = pickle.load(filepath)
        
    with open('data/durations', 'rb') as filepath:
        durations = pickle.load(filepath)
        
    pitchnames = sorted(set(notes))
    unique_durations = sorted(set(durations))
    
    n_notes = len(pitchnames)
    n_durations = len(unique_durations)
    
    print("Getting weight path...")
    weight_path = get_weights()
    
    print("Preparing sequences for prediction...")
    notes_input, normalized_notes, durations_input, normalized_durations = prepare_predict_sequences(notes, durations, pitchnames, unique_durations, n_notes, n_durations)
    
    print("Creating model...")
    model = create_model_functional(normalized_notes, normalized_durations, n_notes, n_durations)
    
    # Load model weights
    print("Loading weights...")
    model.load_weights(weight_path)
    
    print("Generating notes...")
    prediction_output = generate_notes(model, notes_input, durations_input, pitchnames, unique_durations)
    
    print("Writing to midi...")
    create_midi(prediction_output)
    

In [193]:
predict()

Getting weight path...
Preparing sequences for prediction...
Creating model...
Loading weights...
Generating notes...
Writing to midi...


In [127]:
# Current "best-sounding" output (loops after about 30 seconds probably due to overfitting or lack of training examples)
display.Audio('output_midi_2bilstm_lstm_60.mp3')

In [182]:
# Next "best-sounding" output (also loops due to lack variety in the dataset and very likely overfits)
display.Audio('output_midi_2bilstm_lstm_60_2.mp3')

In [183]:
# Another "best-sounding" output (loops as well)
display.Audio('output_midi_2bilstm_lstm_60_3.mp3')