In [None]:
import warnings
warnings.filterwarnings('ignore')

import os, shutil
from time import time
import pandas as pd
import numpy as np

# if gpu ready
import tensorflow as tf
import tensorflow.keras.utils as ku
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout

from utils import midi_download
from utils import utils

In [None]:
input_url=input('input_url:')
print(input_url)
midi_dir = input('midi_dir:')
midi_dir = os.path.join(os.getcwd(), 'midi_download', midi_dir)
print(midi_dir)
midi_download.get_midi(input_url, midi_dir)

In [None]:
# GPU ready
assert tf.test.is_gpu_available()

In [None]:
# ----------------------------------------------------------------------------
# Set model version
# model_version = 'model.'+str(time())
model_version = input('model version:')
print(model_version)

# Setup folder to output model
model_dir = os.path.join(os.getcwd(), 'models')
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

out_dir = os.path.join(os.getcwd(), 'models', model_version)
if os.path.isdir(out_dir):
    shutil.rmtree(out_dir)
os.mkdir(out_dir)

data_dir = os.path.join(out_dir, 'data')
os.mkdir(data_dir)

In [None]:
# pick a folder with midi files
# midi_dir  = os.path.join(os.getcwd(), 'midi_all')
midi_dir  = input('midi_dir:')
midi_dir  = os.path.join(os.getcwd(), 'midi_download', midi_dir)
print(midi_dir)
train_dir = os.path.join(data_dir, 'train')
test_dir  = os.path.join(data_dir, 'test')
utils.split_midi_train_test(midi_dir, train_dir, test_dir)

In [None]:
output_notes = utils.midi_2_notes(os.path.join(test_dir, np.random.choice(os.listdir(test_dir))))
print(output_notes)

out_fn = os.path.join(test_dir, 'test_out.mid')
utils.notes_2_midi(output_notes, out_fn, simple=True)

In [None]:
# convert all the valid midi files in train folder into one csv corpus
out_fn     = os.path.join(data_dir, 'corpus.csv')
utils.midi_2_csv(train_dir, out_fn, small_f=False)

In [None]:
#--------- train -------------------------------------------
corpus_fn    = os.path.join(data_dir, 'corpus.csv')
assert os.path.exists(corpus_fn)
df_corpus    = pd.read_csv(corpus_fn, header=None, names=['notes'])
corpus_train = ' '.join(df_corpus['notes'].tolist())
raw_notes    = corpus_train.split(' ')
notes        = sorted(list(set(raw_notes)))
note_2_int   = dict((n, i) for i, n in enumerate(notes))

n_notes = len(raw_notes)
n_vocab = len(notes)
print('Total number of notes in training corpus: {}, number of unique note: {}'.format(n_notes, n_vocab))

In [None]:
raw_notes

In [None]:
note_2_int

In [None]:
int_2_note = dict((i, n) for i, n in enumerate(notes))

In [None]:
int_2_note

- we will split the entire training corpus into subsequences of 100 notes (an arbitrary fixed length)

- Each training pattern of the network is comprised of 100 time steps of one note (X) followed by one note output (y). When creating these sequences, we slide this window along the training corpus one note at a time, allowing each note a chance to be learned from the 100 note that preceded it (except the first 100 note of course).

In [None]:
# prepare the dataset of input to output pairs encoded as integers
seq_length = 100
dataX = []
dataY = []
for i in range(0, n_notes - seq_length, 1):
    seq_in  = raw_notes[i:i + seq_length]
    seq_out = raw_notes[i + seq_length]
    dataX.append([note_2_int[note] for note in seq_in])
    dataY.append(note_2_int[seq_out])
n_patterns = len(dataX)
print('Total Patterns: {}'.format(n_patterns))

In [None]:
# reshape X to be [samples, time steps, features]
X = np.reshape(dataX, (n_patterns, seq_length, 1))
# normalize
X = X / float(n_vocab)
# one hot encode the output variable
y = ku.to_categorical(dataY)

In [None]:
X.shape

In [None]:
y.shape

In [None]:
# define the LSTM model
model = Sequential()
model.add(LSTM(256, input_shape=(X.shape[1], X.shape[2]), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(256))
model.add(Dropout(0.2))
model.add(Dense(y.shape[1], activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [None]:
# ----------------------------------------------------------------------------
# Callback: checkpoint
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping
fn = os.path.join(out_dir, model_version+'_checkpoint_epoch.hdf5')
checkpoint_epoch = ModelCheckpoint(fn, monitor='loss', verbose=1, save_best_only=True, mode='min')

In [None]:
history = model.fit(X, y, epochs=100, batch_size=128, callbacks=[checkpoint_epoch])

### Generate Notes

In [None]:
seed_fn    = os.path.join(test_dir, np.random.choice(os.listdir(test_dir)))
# seed_fn    = os.path.join(os.getcwd(), 'midi', 'fur-elise.mid')
print(seed_fn)
seed_notes = utils.midi_2_notes(seed_fn).split(' ')
# start      = np.random.randint(0, len(seed_notes)-seq_length)
start      = 0
pattern    = []
for i in range(seq_length):
    pattern.append(note_2_int[seed_notes[start+i]])
pattern

In [None]:
# pattern = []
# for i in range(seq_length):
#     pattern.append(np.random.choice(range(len(notes))))

In [None]:
start = ' '.join([int_2_note[value] for value in pattern])
print('Seed: {}'.format(start))

In [None]:
# generate notes
generated = []
for i in range(500):
    x = np.reshape(pattern, (1, len(pattern), 1))
    x = x / float(n_vocab)
    prediction = model.predict(x, verbose=0)
    index = np.argmax(prediction)
    result = int_2_note[index]
    generated.append(result)
    seq_in = [int_2_note[value] for value in pattern]
    pattern.append(index)
    pattern = pattern[1:len(pattern)]
print('Done')

In [None]:
output_notes = start +' '+ ' '.join(generated)
# output_notes = ' '.join(generated)
print(output_notes)

In [None]:
fn = os.path.join(out_dir,'generated.'+str(time())+'.mid')
utils.notes_2_midi(output_notes, fn, simple=True)

In [None]:
# ----------------------------------------------------------------------------
# output the model to disk
fn = os.path.join(out_dir, model_version+'.hdf5')
if os.path.exists(fn):
    os.remove(fn)
model.save(fn)

# output the history to disk
fn = os.path.join(out_dir, model_version+'.history.pkl')
utils.pkl_dump(history.history, fn)

# Save the script
src_fn = os.path.join(os.getcwd(), 'demo.ipynb')
trg_fn = os.path.join(out_dir, 'demo.ipynb')
if os.path.exists(trg_fn):
    os.remove(trg_fn)
shutil.copy(src_fn, trg_fn)