In [1]:
import os
from collections import defaultdict

import numpy as np

from keras import Sequential
from keras.utils import to_categorical
from keras.layers import LSTM, Dense, Dropout, Flatten, Activation

from pyknon.genmidi import Midi
from pyknon.music import NoteSeq, Note
from music21 import midi, stream, converter, note, chord, instrument

import wandb
from wandb.keras import WandbCallback

Using TensorFlow backend.


In [25]:
MIDI_DIR = 'Omnibook/Midi'
SEQ_LENGTH = 10

In [3]:
def make_midi(notes, name, filepath):
    notes = [Note(note) for note in notes]
    midi = Midi(1, tempo=90)
    midi.seq_notes(notes, track=0)
    midi.write(filepath)
    
def play_midi(filepath):
    mf = midi.MidiFile()
    mf.open(filepath)
    mf.read()
    mf.close()
    stream = midi.translate.midiFileToStream(mf)
    stream.show('midi')

def load_midi(filepath):
    mf = midi.MidiFile()
    mf.open(filepath)
    mf.read()
    mf.close()
    return mf

def load_midi_dir(path):
    filenames = os.listdir(path)
    filepaths = [os.path.join(path, fn) for fn in filenames]
    return [load_midi(fp) for fp in filepaths]

def get_pitch_range(streams):
    all_pitches = set(pitch for stream in streams for pitch in stream.pitches)
    return min(all_pitches), max(all_pitches)

def get_notes(stream):
    return stream.elements[0].notesAndRests

def get_durations(streams):
    return set(note.duration.quarterLength for stream in streams for note in get_notes(stream))

def build_indexes(pitches):
    ind_to_pitch = dict(enumerate(pitches, 1))
    ind_to_pitch[0] = 'rest'
    pitch_to_ind = {v: k for k, v in ind_to_pitch.items()}
    return pitch_to_ind, ind_to_pitch

Load midi files and convert them to streams.

In [95]:
midi_files = load_midi_dir(MIDI_DIR)
streams = [midi.translate.midiFileToStream(mf) for mf in midi_files]

Get pitch range

In [5]:
min_pitch, max_pitch = get_pitch_range(streams)
min_pitch_midi = min_pitch.midi
max_pitch_midi = max_pitch.midi
pitches = list(range(min_pitch_midi, max_pitch_midi + 1))
pitch_to_index, index_to_pitch = build_indexes(pitches)
vocab_size = len(pitch_to_index)

Get note durations

In [6]:
durations = get_durations(streams)

In [10]:
def encode_note(note, pitch_to_index):
    if note.isRest:
        return pitch_to_index['rest']
    return pitch_to_index[note.pitch.midi]

def encode_notes(notes, pitch_to_index):
    return [encode_note(note, pitch_to_index) for note in notes]

In [14]:
def make_training_sequences(streams, length, pitch_to_index):
    training_sequences = []
    labels = []
    for stream in streams:
        notes = get_notes(stream)
        for index in range(len(notes)-length):
            encoded_notes = encode_notes(notes[index:index+length], pitch_to_index)
            training_sequences.append(encoded_notes)
            labels.append(encode_note(notes[index+length], pitch_to_index))
    return training_sequences, labels

In [26]:
training_data, training_labels = make_training_sequences(streams, SEQ_LENGTH, pitch_to_index)
training_data = to_categorical(training_data, num_classes=vocab_size)
training_labels = to_categorical(training_labels, num_classes=vocab_size)

In [27]:
print("Training data shape:", training_data.shape)
print("Training labels shape:", training_labels.shape)

Training data shape: (22739, 10, 33)
Training labels shape: (22739, 33)


In [20]:
model = Sequential()
model.add(LSTM(128, input_shape=training_data.shape[1:], return_sequences=True))
model.add(Dropout(0.5))
model.add(LSTM(128))
model.add(Dense(512))
model.add(Dropout(0.5))
model.add(Dense(vocab_size))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')

wandb.init()

W&B Run: https://app.wandb.ai/pvarsh/bop-net/runs/kyne7r92
Call `%%wandb` in the cell containing your training loop to display live results.


W&B Run https://app.wandb.ai/pvarsh/bop-net/runs/kyne7r92

In [111]:
model.fit(training_data, training_labels, epochs=10, batch_size=64, validation_split=0.1, callbacks=[WandbCallback()])

Train on 20465 samples, validate on 2274 samples
Epoch 1/10
Resuming run: https://app.wandb.ai/pvarsh/bop-net/runs/kyne7r92
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1337b1a58>

In [29]:
def improvise(model, start_input, index_to_note, note_to_index, sequence_length, solo_length):
    start_input_indices = encode_notes(start_input, note_to_index)
    
    solo = []
    solo.extend(start_input_indices)
    
    vocab_size = len(index_to_note)
    
    for _ in range(solo_length):
        network_input = solo[-sequence_length:]
        network_input = to_categorical(network_input, num_classes=vocab_size)
        network_input = np.reshape(network_input, (1,) + network_input.shape)
        prediction = model.predict(network_input, verbose=False)
        prediction_note_index = int(np.random.choice(prediction.shape[1], 1, p=prediction[0]))
        solo.append(prediction_note_index)
    solo = [index_to_note[ind] for ind in solo]
    return solo

In [45]:
def create_stream(notes):
    s = stream.Stream()
    for n in notes:
        if n == 'rest':
            s.append(note.Rest(type='eighth'))
        else:
            s.append(note.Note(n, type='eighth'))
    return s

In [113]:
solo = improvise(
    model,
    start_input=get_notes(streams[7])[:SEQ_LENGTH*4],
    index_to_note=index_to_pitch,
    note_to_index=pitch_to_index,
    sequence_length=SEQ_LENGTH,
    solo_length=100,
)
solo_stream = create_stream(solo)
solo_stream.show('midi')

In [114]:
mf = midi.translate.streamToMidiFile(solo_stream)
mf.open('bloomdido_with_rests_13_epochs.mid', 'wb')
mf.write()
mf.close()