In [1]:
%matplotlib inline
import glob
import keras
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras import backend as K
from keras import metrics
import mido
import numpy as np
import random
import skimage.io
import tensorflow

Using TensorFlow backend.


ImportError: No module named skimage.io

In [2]:
notes = 36
lookback = 64

In [None]:
class VAE(object):
    def create(self, vocab_size=64, max_length=256, latent_rep_size=128, lr=0.001):
        self.encoder = None
        self.decoder = None
        self.sentiment_predictor = None
        self.autoencoder = None

        x = Input(shape=(max_length, vocab_size))
        #x_embed = Embedding(vocab_size, 64, input_length=max_length)(x)

        vae_loss, encoded = self._build_encoder(x, latent_rep_size=latent_rep_size, max_length=max_length)
        self.encoder = Model(inputs=x, outputs=encoded)

        encoded_input = Input(shape=(latent_rep_size,))
        predicted_sentiment = self._build_sentiment_predictor(encoded_input)
        self.sentiment_predictor = Model(encoded_input, predicted_sentiment)

        decoded = self._build_decoder(encoded_input, vocab_size, max_length)
        self.decoder = Model(encoded_input, decoded)

        self.autoencoder = Model(inputs=x, outputs=[self._build_decoder(encoded, vocab_size, max_length), self._build_sentiment_predictor(encoded)])
        self.autoencoder.compile(optimizer=Adam(lr=lr),
                                 loss=[vae_loss, 'binary_crossentropy'],
                                 metrics=['accuracy'])
    
    def _build_encoder(self, x, latent_rep_size=128, max_length=None, epsilon_std=0.01):
        h = Bidirectional(LSTM(500, return_sequences=True, name='lstm_1'), merge_mode='concat')(x)
        h = Dropout(0.5, name='dropout_1')(h)
        h = Bidirectional(LSTM(500, return_sequences=False, name='lstm_2'), merge_mode='concat')(h)
        h = Dropout(0.5, name='dropout_2')(h)
        h = Dense(435, activation='relu', name='dense_1')(h)

        def sampling(args):
            z_mean_, z_log_var_ = args
            batch_size = K.shape(z_mean_)[0]
            epsilon = K.random_normal(shape=(batch_size, latent_rep_size), mean=0., stddev=epsilon_std)
            return z_mean_ + K.exp(z_log_var_ / 2) * epsilon

        z_mean = Dense(latent_rep_size, name='z_mean', activation='linear')(h)
        z_log_var = Dense(latent_rep_size, name='z_log_var', activation='linear')(h)

        def vae_loss(x, x_decoded_mean):
            x = K.flatten(x)
            x_decoded_mean = K.flatten(x_decoded_mean)
            xent_loss = max_length * metrics.binary_crossentropy(x, x_decoded_mean)
            kl_loss = - 0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
            return xent_loss + kl_loss

        return (vae_loss, Lambda(sampling, output_shape=(latent_rep_size,), name='lambda')([z_mean, z_log_var]))
    
    def _build_decoder(self, encoded, vocab_size, max_length):
        repeated_context = RepeatVector(max_length)(encoded)

        h = LSTM(500, return_sequences=True, name='dec_lstm_1')(repeated_context)
        h = LSTM(500, return_sequences=True, name='dec_lstm_2')(h)

        decoded = TimeDistributed(Dense(vocab_size, activation='sigmoid'), name='decoded_mean')(h)

        return decoded
    
    def _build_sentiment_predictor(self, encoded):
        h = Dense(100, activation='linear')(encoded)

        return Dense(64, activation='sigmoid', name='pred')(h)

In [None]:
from keras.callbacks import ModelCheckpoint
from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences
import os

In [None]:
MAX_LENGTH = 256
NUM_WORDS = 64

In [None]:
(X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=NUM_WORDS)

print("Training data")
print(X_train.shape)
print(y_train.shape)

print("Number of words:")
print(len(np.unique(np.hstack(X_train))))

In [None]:
X_train = pad_sequences(X_train, maxlen=MAX_LENGTH)
X_test = pad_sequences(X_test, maxlen=MAX_LENGTH)

train_indices = np.random.choice(np.arange(X_train.shape[0]), 2000, replace=False)
test_indices = np.random.choice(np.arange(X_test.shape[0]), 1000, replace=False)

X_train = X_train[train_indices]
y_train = y_train[train_indices]

X_test = X_test[test_indices]
y_test = y_test[test_indices]

In [None]:
temp = np.zeros((X_train.shape[0], MAX_LENGTH, NUM_WORDS))
temp[np.expand_dims(np.arange(X_train.shape[0]), axis=0).reshape(X_train.shape[0], 1), np.repeat(np.array([np.arange(MAX_LENGTH)]), X_train.shape[0], axis=0), X_train] = 1

X_train_one_hot = temp

temp = np.zeros((X_test.shape[0], MAX_LENGTH, NUM_WORDS))
temp[np.expand_dims(np.arange(X_test.shape[0]), axis=0).reshape(X_test.shape[0], 1), np.repeat(np.array([np.arange(MAX_LENGTH)]), X_test.shape[0], axis=0), X_test] = 1

x_test_one_hot = temp

In [None]:
def create_model_checkpoint(dir, model_name):
    filepath = dir + '/' + \
               model_name + "-{epoch:02d}-{val_decoded_mean_acc:.2f}-{val_pred_loss:.2f}.h5"
    directory = os.path.dirname(filepath)

    try:
        os.stat(directory)
    except:
        os.mkdir(directory)

    checkpointer = ModelCheckpoint(filepath=filepath,
                                   verbose=1,
                                   save_best_only=False)

    return checkpointer

In [None]:
def train():
    model = VAE()
    model.create(vocab_size=NUM_WORDS, max_length=MAX_LENGTH)
    model.autoencoder.summary()

    checkpointer = create_model_checkpoint('models', 'rnn_ae')

    model.autoencoder.fit(x=xdata, y={'decoded_mean': xdata, 'pred': ydata},
                          batch_size=10, epochs=10, callbacks=[checkpointer],
                          validation_data=(xtest, {'decoded_mean': xtest, 'pred':  ytest}))

In [None]:
xdata = np.random.randint(2, size=(128, MAX_LENGTH, NUM_WORDS))

In [None]:
ydata = np.random.randint(2, size=(128, NUM_WORDS))

In [None]:
xtest = np.random.randint(2, size=(64, MAX_LENGTH, NUM_WORDS))

In [None]:
ytest = np.random.randint(2, size=(64, NUM_WORDS))

In [None]:
train()

In [None]:
def chords_from_midi(midi_file):
    data = []
    midi = mido.MidiFile(midi_file)
    for track in midi.tracks:
        if track.name == 'Chords':
            for message in track:
                if message.type in ['note_on', 'note_off']:
                    data.append((1 if message.type == 'note_on' else 0, message.note, message.velocity, message.time))
    assert data
    return np.array(data)


def encode_chords(sequence):
    switches = []
    keys = []
    velocities = []
    times = []
    switch = None
    key = None
    velocity = None
    time = None
    minimum = min(sequence[:, 1])
    for item in sequence:
        if switch != item[0] or velocity != item[2] or item[3] != 0:
            if switch is not None and key is not None and velocity is not None and time is not None:
                switches.append(switch)
                keys.append(key)
                velocities.append([int(x) for x in format(velocity, '08b')])
                times.append([int(x) for x in format(time, '016b')])
            key = np.zeros((notes,), dtype=int)
            time = item[3]
        switch = item[0]
        key[item[1] - minimum] = 1
        velocity = item[2]
    return [np.array(switches)[:, np.newaxis], np.array(keys), np.array(velocities), np.array(times)]


def augment_chords(data):
    augmented = []
    assert len(set([len(data[0]), len(data[1]), len(data[2]), len(data[3])])) == 1
    events = len(data[0])
    
    def high(sequence):
        high = 0
        for event in range(1, events):
            high_candidate = notes - np.argmax(sequence[event, ::-1])
            if high_candidate > high:
                high = high_candidate
        return high
    
    maximum = high(data[1])
    transpositions = notes - maximum + 1
    for i in range(transpositions):
        progression = np.empty((events, notes), dtype=int)
        for j in range(events):
            progression[j, :] = np.concatenate((
                np.zeros((i,)),
                data[1][j, :maximum],
                np.zeros((notes - maximum - i,))
            ))
        augmented.append([data[0], progression, data[2], data[3]])
    return augmented


def prepare_chords(data):
    assert len(set([len(data[0]), len(data[1]), len(data[2]), len(data[3])])) == 1
    sequences = len(data[0])
    x = [np.zeros((sequences, lookback, 1), dtype=int), np.zeros((sequences, lookback, notes), dtype=int), np.zeros((sequences, lookback, 8), dtype=int), np.zeros((sequences, lookback, 16), dtype=int)]
    y = data
    for i in range(1, sequences):
        x[0][i, -i:, :] = data[0][:i, :]
        x[1][i, -i:, :] = data[1][:i, :]
        x[2][i, -i:, :] = data[2][:i, :]
        x[3][i, -i:, :] = data[3][:i, :]
    return x, y


def load_chords(midi_dir):
    all_data = []
    midi_files = sorted(glob.glob(os.path.join(midi_dir, '*.mid')) + glob.glob(os.path.join(midi_dir, '*.midi')))
    for midi_file in midi_files:
        try:
            data = [prepare_chords(x) for x in augment_chords(encode_chords(chords_from_midi(midi_file)))]
            all_data.extend(data)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            print("Skipping", midi_file)
    random.shuffle(all_data)
    return all_data


# def input_output(sequence):
#     x = []
#     y = []
#     for i in range(len(sequence)):
#         if i == 0:
#             x.append(np.zeros((1, 1, 1 + notes + 8 + 16)).astype(int))
#         elif i < lookback:
#             x.append(np.vstack([np.zeros((1, 1 + notes + 8 + 16)), sequence[:i, :]])[np.newaxis, :, :].astype(int))
#         else:
#             x.append(np.vstack([np.zeros((1, 1 + notes + 8 + 16)), sequence[i - lookback:i, :]])[np.newaxis, :, :].astype(int))
#         y.append(sequence[np.newaxis, i, :].astype(int))
#     return (x, y)


# def pad_sequences(data, length):
#     x = np.zeros((1, length, 1 + notes + 8 + 16))
#     y = np.zeros((1, 1 + notes + 8 + 16))
#     for i, seq in enumerate(data[0]):
#         seq_len = seq.shape[1]
#         pad_len = length - seq_len
#         for j in range(seq_len):
#             x[0, length - seq_len + j, :] = seq[0, j, :]
#         #y[0, i] = 
#     return (x, y)


def generator(data):
    while True:
        for sequence in data:
            yield (sequence[0], sequence[1])

In [None]:
#data = pad_sequences(input_output(encode_chords(chords_from_midi('/home/santiago/Projects/MusicGenerator/data/midi/6.mid'))), 32)

In [None]:
a = chords_from_midi('/home/santiago/Projects/MusicGenerator/data/midi/6.mid')

In [None]:
a

In [None]:
b = encode_chords(a)

In [None]:
b[1].shape

In [None]:
b

In [None]:
z = augment_chords(b)

In [None]:
z

In [None]:
c = prepare_chords(z)

In [None]:
c

In [None]:
data = prepare_chords(encode_chords(chords_from_midi('/home/santiago/Projects/MusicGenerator/data/midi/6.mid')))

In [None]:
data = load_chords('/home/santiago/Projects/MusicGenerator/data/midi/')

In [None]:
midi_file = '/home/santiago/Projects/MusicGenerator/data/midi/6.mid'

In [None]:
data = [prepare_chords(x) for x in augment_chords(encode_chords(chords_from_midi(midi_file)))]

In [None]:
data

In [None]:
gen = generator(data)

In [None]:
gen

In [None]:
data[0]

In [None]:
class VAE(object):
    def create(self, data_width=1+notes+8+16, lookback=lookback, latent_rep_size=128, lr=0.001):
        self.encoder = None
        self.decoder = None
        self.predictor = None
        self.autoencoder = None
        
        switches_in = Input(shape=(lookback, 1), name='switches_in')
        notes_in = Input(shape=(lookback, notes), name='notes_in')
        velocities_in = Input(shape=(lookback, 8), name='velocities_in')
        times_in = Input(shape=(lookback, 16), name='times_in')
        
        x = Concatenate(name='input_concat')([switches_in, notes_in, velocities_in, times_in])
        
        #x = Input(shape=(max_length, vocab_size))

        vae_loss, encoded = self._build_encoder(x, latent_rep_size=latent_rep_size, lookback=lookback)
        self.encoder = Model(inputs=[switches_in, notes_in, velocities_in, times_in], outputs=encoded)

        encoded_input = Input(shape=(latent_rep_size,))
        predicted = self._build_predictor(encoded_input)
        self.predictor = Model(encoded_input, predicted)

        decoded = self._build_decoder(encoded_input, data_width, lookback)
        self.decoder = Model(encoded_input, decoded)
        
        autoencoder_outputs = [self._build_decoder(encoded, data_width, lookback)]
        autoencoder_outputs.extend(self._build_predictor(encoded))
        
        self.autoencoder = Model(inputs=[switches_in, notes_in, velocities_in, times_in], outputs=autoencoder_outputs)
        self.autoencoder.compile(optimizer=Adam(lr=lr),
                                 loss=[vae_loss, 'binary_crossentropy', 'binary_crossentropy', 'binary_crossentropy', 'binary_crossentropy'],
                                 metrics=['accuracy'])
    
    def _build_encoder(self, x, latent_rep_size=128, lookback=None, epsilon_std=0.01):
        h = Bidirectional(LSTM(500, return_sequences=True), merge_mode='concat', name='bidirectional_1')(x)
        h = Dropout(0.5, name='dropout_1')(h)
        h = Bidirectional(LSTM(500, return_sequences=False), merge_mode='concat', name='bidirectional_2')(h)
        h = Dropout(0.5, name='dropout_2')(h)
        h = Dense(435, activation='relu', name='dense_1')(h)

        def sampling(args):
            z_mean_, z_log_var_ = args
            batch_size = K.shape(z_mean_)[0]
            epsilon = K.random_normal(shape=(batch_size, latent_rep_size), mean=0., stddev=epsilon_std)
            return z_mean_ + K.exp(z_log_var_ / 2) * epsilon

        z_mean = Dense(latent_rep_size, name='z_mean', activation='linear')(h)
        z_log_var = Dense(latent_rep_size, name='z_log_var', activation='linear')(h)

        def vae_loss(x, x_decoded_mean):
            x = K.flatten(x)
            x_decoded_mean = K.flatten(x_decoded_mean)
            xent_loss = lookback * metrics.binary_crossentropy(x, x_decoded_mean)
            kl_loss = - 0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
            return xent_loss + kl_loss

        return (vae_loss, Lambda(sampling, output_shape=(latent_rep_size,), name='lambda')([z_mean, z_log_var]))
    
    def _build_decoder(self, encoded, data_width, lookback):
        repeated_context = RepeatVector(lookback, name='repeat_vector')(encoded)

        h = LSTM(500, return_sequences=True, name='dec_lstm_1')(repeated_context)
        h = LSTM(500, return_sequences=True, name='dec_lstm_2')(h)

        decoded = TimeDistributed(Dense(data_width, activation='sigmoid'), name='decoded_mean')(h)

        return decoded
    
    def _build_predictor(self, encoded):
        h = Dense(100, activation='linear', name='dense_2')(encoded)
        
        switches_out = Dense(1, activation='sigmoid', name='switches_out')(h)
        notes_out = Dense(notes, activation='sigmoid', name='notes_out')(h)
        velocities_out = Dense(8, activation='sigmoid', name='velocities_out')(h)
        times_out = Dense(16, activation='sigmoid', name='times_out')(h)
        
        return switches_out, notes_out, velocities_out, times_out
        
        #return Dense(data_width, activation='sigmoid', name='pred')(h)

In [None]:
model = VAE()

In [None]:
model.create()

In [None]:
model.autoencoder.summary()

In [None]:
output_list = []
output_list.extend(data[0][1])
#test = np.concatenate(data[0][0], axis=2)
output_list.append(np.concatenate(data[0][1], axis=1))

In [None]:
data[0][1][1].shape

In [None]:
model.autoencoder.fit(data[0][0], output_list)

In [None]:
data[0][1]

In [None]:
data[0][1]