# Training a Conditional Variational Autoencoder to generate music

In [1]:
import os

import numpy as np
import pretty_midi
import pypianoroll

import scipy.sparse
from scipy.sparse import coo_matrix, save_npz, load_npz

import pickle

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import Model, Input, Sequential
from tensorflow.keras.layers import InputLayer, Flatten, Reshape
from tensorflow.keras.layers import Dense, Conv2D, Conv2DTranspose
from tensorflow.keras.layers import Conv1D, Conv1DTranspose
from tensorflow.keras.layers import Conv3D, Conv3DTranspose
from tensorflow.keras.layers import ConvLSTM1D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import GRU, LSTM
from tensorflow.keras.layers import Lambda
from tensorflow.keras.layers import Concatenate, concatenate
from tensorflow.keras.layers import BatchNormalization as BatchNorm
from tensorflow.keras.layers import ReLU as Relu

from keras import backend as K

import math

import datetime
from IPython import display

import pygame
import time

pygame 2.1.2 (SDL 2.0.18, Python 3.9.7)
Hello from the pygame community. https://www.pygame.org/contribute.html


## Preparing data

The data used in this notebook is the Youtube Piano dataset, which consists of about 10,800 MIDI files with a single track of a piano. Prior to the execution of this notebook, I converted these MIDI files into piano rolls (sampled with frequency 1/100), which were then saved as COO scipy sparse matrices. Although the COO matrices take up more space than the MIDI files, I found that it was better for performance to save the piano rolls and then load them into memory, rather than load the MIDI files into memory and then have to constantly convert them to piano rolls while training, or to get training samples from disk.

In [2]:
DATA_ROOT_DIR = 'C:/_local/data_sets/audio/youtube_piano/'

MIDI_FILES_DIR = DATA_ROOT_DIR + 'midis/'
PIANO_ROLL_DIR = DATA_ROOT_DIR + 'piano_rolls/'
PIANO_ROLL_MASKS_DIR = DATA_ROOT_DIR + 'piano_roll_masks/'

MODELS_ROOT_DIR = 'C:/_local/py/yt_piano_music_gen/models/'
CVAE_DIR = MODELS_ROOT_DIR + 'cvae/'
NEW_VAE_DIR = MODELS_ROOT_DIR + 'new_cvae/'
MELODY_MODELS_DIR = MODELS_ROOT_DIR + 'melody_models/'
CCVAE_MODELS_DIR = MODELS_ROOT_DIR + 'cond_conv_vae/'
MELODY_PREDICTION_MODELS_DIR = MODELS_ROOT_DIR + 'melody_predictor/'

OUTPUTS_ROOT_DIR = 'C:/_local/py/yt_piano_music_gen/outputs/'
NOTE_OUTPUT_DIR = OUTPUTS_ROOT_DIR + 'notes/'
SIMPLE_SEQUENCE_OUTPUT_DIR = OUTPUTS_ROOT_DIR + 'simple_sequences/'
MELODY_OUTPUT_DIR = OUTPUTS_ROOT_DIR + 'melodies/'
TEMP_OUTPUT_PATH = OUTPUTS_ROOT_DIR + 'temp.mid'

In [3]:
PIANO_ROLL_PATHS = np.array([PIANO_ROLL_DIR+f for f in os.listdir(PIANO_ROLL_DIR)])
MIDI_FILE_PATHS = np.array([MIDI_FILES_DIR+f for f in os.listdir(MIDI_FILES_DIR)])
CURRENT_NUM_PIANO_ROLLS = len(PIANO_ROLL_PATHS)

In [145]:
NUMBER_OF_SAMPLES_SUBSET = 1200 # <-- Number of piano rolls to randomly sample for training/testing

MELODY_INPUT_TIME_S = 6.4 # <-- Length of inputs for melody model in seconds
MELODY_INPUT_NUM_SEQUENCES = int(MELODY_INPUT_TIME_S * 100 // 64) # <-- Number of SEQUENCE_LENGTH sequences as input for melody model
MELODY_BUFFER_S = .6 # <-- Extra time for masking piano rolls

TRIM_START = True # <-- Only consider sequences after first note played
TRIM_END = True # <-- Only consider sequences before last note played

SEQUENCE_LENGTH = 64 # <-- length of input / target sequences in 1/100 seconds
LATENT_DIM = 128
BATCH_SIZE = 32

In [5]:
MASK_TIME_STRING = '-'.join(str(round(MELODY_INPUT_TIME_S+MELODY_BUFFER_S, 1)).split('.'))
PIANO_ROLLS_MASK_PATH = PIANO_ROLL_MASKS_DIR + f'melody_{MASK_TIME_STRING}_s_{CURRENT_NUM_PIANO_ROLLS}'
PIANO_ROLLS_MASK = None
MASKED_PIANO_ROLLS_PATHS = None

def get_if_piano_roll_long_enough(piano_roll_path):
    
    pr = load_npz(piano_roll_path)
    return pr.col[-1] - pr.col[0] > MELODY_INPUT_TIME_S + MELODY_BUFFER_S

if not os.path.isfile(PIANO_ROLLS_MASK_PATH):
    
    mask = [get_if_piano_roll_long_enough(prp) for prp in PIANO_ROLL_PATHS]
    
    with open(PIANO_ROLLS_MASK_PATH, 'wb') as f:
        pickle.dump(mask, f)
        
with open(PIANO_ROLLS_MASK_PATH, 'rb') as f:
    PIANO_ROLLS_MASK = pickle.load(f)

In [6]:
# Utility functions for converting scipy-sparse matrices to tf.sparse.SparseTensor
def scipy_sparse_to_sparse_tensor(scipy_sparse):
    
    indices = np.mat([scipy_sparse.row, scipy_sparse.col]).transpose()
    return tf.cast(tf.sparse.SparseTensor(indices, 
                                          scipy_sparse.data, 
                                          scipy_sparse.shape),
                    dtype=tf.float32
                   )

def list_of_scipy_sparse_to_list_sparse_tensor(list_scipy_sparse):
    return [scipy_sparse_to_sparse_tensor(s) for s in list_scipy_sparse]

def piano_roll_path_to_sparse_tensor(piano_roll_path):
    
    s = load_npz(piano_roll_path)   
    return scipy_sparse_to_sparse_tensor(s)

In [7]:
print(f'total number of piano rolls: {len(PIANO_ROLL_PATHS)}')

piano_rolls_size_gb = 0
for pr in PIANO_ROLL_PATHS:                      # kb   # mb   # gb
    piano_rolls_size_gb += os.path.getsize(pr) / 1000 / 1000 / 1000
    
print(f'total size of piano rolls: {round(piano_rolls_size_gb, 3)} gb\n')



PIANO_ROLL_PATHS_MASKED = PIANO_ROLL_PATHS[PIANO_ROLLS_MASK]

print(f'number of piano rolls in subset: {len(PIANO_ROLL_PATHS_MASKED)}')

piano_rolls_size_gb = 0
for pr in PIANO_ROLL_PATHS_MASKED:                      # kb   # mb   # gb
    piano_rolls_size_gb += os.path.getsize(pr) / 1000 / 1000 / 1000
    
print(f'total size of piano rolls: {round(piano_rolls_size_gb, 3)} gb')

total number of piano rolls: 2000
total size of piano rolls: 0.582 gb

number of piano rolls in subset: 1403
total size of piano rolls: 0.417 gb


The following cell randomly selects NUMBER_OF_PIANO_ROLLS_SUBSET piano rolls to train and test our CVAE on. Since each piano roll has at least a few thousand time-steps (so at least a few thousand sequences for training), this should be enough data for training.

In [8]:
np.random.seed(1)

PIANO_ROLL_PATHS_SUBSET = np.random.choice(PIANO_ROLL_PATHS_MASKED, size=1200, replace=False)

print(f'Number of piano rolls: {len(PIANO_ROLL_PATHS_SUBSET)}')

piano_rolls_size_gb = 0
for pr in PIANO_ROLL_PATHS_SUBSET:               # kb   # mb   # gb
    piano_rolls_size_gb += os.path.getsize(pr) / 1000 / 1000 / 1000
    
print(f'total size of piano rolls: {round(piano_rolls_size_gb, 3)} gb')

Number of piano rolls: 1200
total size of piano rolls: 0.343 gb


In [9]:
def get_piano_rolls_nonzero_mean_std(piano_roll_paths):
    
    nz_sums = []
    total_nnz = 0
    
    sparse = None
        
    for prp in piano_roll_paths:
        sparse = load_npz(prp)
        nz_sums.append(sparse.data.sum())
        total_nnz += sparse.nnz
    
    mean = (np.array(nz_sums) / total_nnz).sum()
    
    def get_std_disc(x, u):
        return (((x - u)**2)/total_nnz).sum()
    
    nz_std_disc_sum = 0
    for prp in piano_roll_paths:
        sparse = load_npz(prp)
        nz_std_disc_sum += get_std_disc(sparse.data, mean)
    
    std = nz_std_disc_sum ** (.5)
    
    return mean, std

def get_sparse_mats_mean_std(sparse_matrices):
    
    sums = []
    total_size = 0
        
    for s in sparse_matrices:
        sums.append(s.A.sum())
        total_size += s.shape[0]*s.shape[1]
    
    mean = (np.array(sums) / total_size).sum()
    
    def get_std_disc(x, u):
        return (((x - u)**2)/total_size).sum()
    
    std_disc_sum = 0
    for s in sparse_matrices:
        std_disc_sum += get_std_disc(s.A.reshape(-1,1), mean)
    
    std = nz_std_disc_sum ** (.5)
    
    return mean, std

def normal_sparse_matrix(sparse_matrix, mean, std, inplace):
    
    if inplace:
        sparse_matrix.data = (sparse_matrix.data - mean) / std
    else:
        sparse_matrix_new = sparse_matrix
        sparse_matrix_new.data = (sparse_matrix_new.data - mean) / std
        return sparse_matrix_new

def normalize_list_of_sparse_matrices(sparse_matrices):
    
    mean, std = get_sparse_mats_nonzero_mean_std(sparse_matrices)
    
    for s in sparse_matrices:    
        s.data = (s.data - mean) / std
        
    return sparse_matrices

def piano_roll_paths_to_norm_sparse_tensors(piano_roll_paths):
    
    mean, std = get_piano_rolls_nonzero_mean_std(piano_roll_paths)
    
    sparse_tensors = []
    for prp in piano_roll_paths:
        
        sparse_matrix = load_npz(prp)
        normal_sparse_matrix(sparse_matrix, mean, std, True)
        sparse_tensors.append(scipy_sparse_to_sparse_tensor(sparse_matrix))
        
    return sparse_tensors

def piano_roll_paths_to_scaled_sparse_tensors(piano_roll_paths):
    
    sparse_tensors = []
    for prp in piano_roll_paths:
        
        sparse_matrix = load_npz(prp)
        sparse_matrix.data[sparse_matrix.data > 127] = 127.
        sparse_matrix.data = sparse_matrix.data / 127.
        
        sparse_tensors.append(scipy_sparse_to_sparse_tensor(sparse_matrix))
        
    return sparse_tensors
    

In [10]:
# Now load the NUMBER_OF_PIANO_ROLLS_SUBSET piano rolls into memory

PIANO_ROLLS = np.array(piano_roll_paths_to_scaled_sparse_tensors(PIANO_ROLL_PATHS_SUBSET))

In [11]:
PIANO_ROLLS_TRAIN, PIANO_ROLLS_TEST = train_test_split(PIANO_ROLLS, random_state=2, test_size=200)

In [12]:
print(f'training piano rolls: {PIANO_ROLLS_TRAIN.shape[0]}')
print(f'testing piano rolls: {PIANO_ROLLS_TEST.shape[0]}')

training piano rolls: 1000
testing piano rolls: 200


## Piano Roll helper functions

In [13]:
def piano_roll_to_pretty_midi(pr, ctrl=None, constant_tempo=None, constant_velocity=100):
    '''
    Parameters
    ----------
    pr    : NumPy array of size (t, 128)
    ctrl  : list of length t with - 
              - binary values 0 and 1, where 1 denotes a note onset (for monophonic)
              - 0 and pitch values, where pitch values denote a note onset (for polyphonic)
        
    Returns
    -------
    pm    : `pretty_midi.PrettyMIDI` object
        The converted :class:`pretty_midi.PrettyMIDI` instance.
    '''
    beat_resolution = 4
    pm = pretty_midi.PrettyMIDI()

    if constant_tempo is None:
        constant_tempo = 128
    time_step_size = 60. / constant_tempo / beat_resolution

    instrument = pretty_midi.Instrument(program=0, is_drum=False, name="test")
    clipped = pr.astype(np.uint8)
    binarized = (clipped > 0)
    padded = np.pad(binarized, ((1, 1), (0, 0)), 'constant')
    diff = np.diff(padded.astype(np.int8), axis=0)
    
    positives = np.nonzero((diff > 0).T)
    pitches = positives[0]
    note_ons = positives[1]
    note_on_times = time_step_size * note_ons
    note_offs = np.nonzero((diff < 0).T)[1]
    note_off_times = time_step_size * note_offs
    
    if ctrl is None:
        for idx, pitch in enumerate(pitches):
            velocity = np.mean(clipped[note_ons[idx]:note_offs[idx], pitch])
            note = pretty_midi.Note(
                velocity=int(velocity), pitch=pitch,
                start=note_on_times[idx], end=note_off_times[idx])
            instrument.notes.append(note)
    
    else:
        pairs = []
        for idx, pitch in enumerate(pitches):
            note_on, note_off = note_ons[idx], note_offs[idx]
            true_ons = ctrl[note_ons[idx]:note_offs[idx]]
            on_idx = [i for i in range(len(true_ons)) if true_ons[i] == 1]  # if polyphonic, change 1 to pitch value
            on_idx.pop(0)  # remove 1st onset token

            cur_note_on = note_on
            while on_idx:
                cur_note_off = note_on + on_idx[0]
                pairs.append((pitch, cur_note_on, cur_note_off))
                cur_note_on = cur_note_off
                on_idx.pop(0)
            pairs.append((pitch, cur_note_on, note_off))   
        
        for idx, p in enumerate(pairs):
            pitch, start, end = p
            velocity = np.mean(clipped[start:end, pitch])
            note = pretty_midi.Note(
                velocity=int(velocity), pitch=pitch,
                start=start*time_step_size, end=end*time_step_size)
            instrument.notes.append(note)

    instrument.notes.sort(key=lambda x: x.start)
    pm.instruments.append(instrument)
    
    return pm

In [14]:
def piano_roll_to_pretty_midi(piano_roll, fs=100):
    
    notes, frames = piano_roll.shape
    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=0)

    # pad 1 column of zeros so we can acknowledge inital and ending events
    piano_roll = np.pad(piano_roll, [(0, 0), (1, 1)], 'constant')

    # use changes in velocities to find note on / note off events
    velocity_changes = np.nonzero(np.diff(piano_roll).T)

    # keep track on velocities and note on times
    prev_velocities = np.zeros(notes, dtype=int)
    note_on_time = np.zeros(notes)

    for time, note in zip(*velocity_changes):
        # use time + 1 because of padding above
        velocity = piano_roll[note, time + 1]
        time = time / fs
        if velocity > 0:
            if prev_velocities[note] == 0:
                note_on_time[note] = time
                prev_velocities[note] = velocity
        else:
            pm_note = pretty_midi.Note(
                velocity=prev_velocities[note],
                pitch=note,
                start=note_on_time[note],
                end=time)
            instrument.notes.append(pm_note)
            prev_velocities[note] = 0
    pm.instruments.append(instrument)
    return pm    
    

In [15]:
def play_piano_roll(piano_roll, buffer_time=0, threshold=.4, temp_path=TEMP_OUTPUT_PATH):
    
    if isinstance(piano_roll, tf.sparse.SparseTensor):
        piano_roll = tf.sparse.to_dense(piano_roll).numpy()
    
    if piano_roll.max() < 1.5:
        piano_roll *= 127.
    
    piano_roll[piano_roll > 127] = 126
    #piano_roll[piano_roll <= 127*threshold] = 0
    piano_roll = piano_roll.astype('uint8').round()
            
    midi = piano_roll_to_pretty_midi(piano_roll)
    midi.write(temp_path)
    
    sleep_time = buffer_time + piano_roll.shape[-1] / 100
    
    pygame.mixer.init()
    pygame.mixer.music.load(temp_path)
    
    pygame.mixer.music.play()
    time.sleep(sleep_time)
    
    pygame.mixer.music.stop()
    

In [16]:
play_piano_roll(tf.sparse.slice(PIANO_ROLLS_TRAIN[577], start=[0, 0], size=[128, 1000]))

In [164]:
def play_samples_from_batch(batch, number_of_samples=None, shuffle=None, temp_path=TEMP_OUTPUT_PATH):
    
    if batch.max() <= 2:
        batch *= 127.
        
    batch[batch > 127] = 127
    batch[batch < 0] = 0
    batch = batch.round().astype('uint8')
    batch = batch.squeeze()
        
    if number_of_samples is None:
        number_of_samples = batch.shape[0]
        
    if shuffle is None:
        shuffle = False
        
    if shuffle:
        steps = np.random.choice(np.arange(batch.shape[0]), size=number_of_samples, replace=False)
        steps = sorted(steps)
        
    else: 
        steps = range(number_of_samples)
        
    sleep_time = batch.shape[-1] / 100
        
    for s in steps:
        
        display.clear_output(wait=False)
        print(f'sample # {s}')
        
        pr = batch[s].squeeze()
        midi = piano_roll_to_pretty_midi(pr)
        
        midi.write(temp_path)

        pygame.mixer.init()
        pygame.mixer.music.load(temp_path)
        pygame.mixer.music.play()

        time.sleep(sleep_time)
        pygame.mixer.music.stop()
    

In [18]:
def play_inputs_and_outputs(inputs, outputs, number_of_samples, shuffle):
    
    # Inputs is ndarray shaped batch_size x number_of_melodies x 128 x sequence_length
    # Outputs is ndarray batch_size x 128 x sequence_length
        
    if shuffle:
        possible_sample_indices = range(outputs.shape[0])
        steps = np.random.choice(possible_sample_indices, size=number_of_samples, replace=False)
        
    else: 
        steps = range(number_of_samples)
        
    input_time = inputs.shape[1]*inputs.shape[-1] / 100
    output_time = outputs.shape[2] / 100
    total_time = input_time + output_time + .3
    
    def np_unstack(array):
        nps = [array[b] for b in range(array.shape[0])]
        return nps
    
    for s in steps:
        
        display.clear_output(wait=False)
        print(f'sample # {s}')
        
        sample_input = inputs[s]
        sample_input = np_unstack(sample_input)
        sample_input.append(outputs[s])
        
        pr = np.concatenate(sample_input, axis=-1)
        midi = piano_roll_to_pretty_midi(pr)
                
        midi.write(TEMP_OUTPUT_PATH)

        pygame.mixer.init()
        pygame.mixer.music.load(TEMP_OUTPUT_PATH)
        
        pygame.mixer.music.play()
        time.sleep(total_time)
        
        pygame.mixer.music.stop() 

## Creating TF Datasets

In [146]:
class NoteTargetGenerator:    
    
    def __init__(self, sparse_tensors, yield_target, sequence_length, seed=None):
        
        self.sparse_tensors = sparse_tensors
        self.num_tensors = len(self.sparse_tensors)
        self.yield_target = yield_target
        self.sequence_length = sequence_length
        self.seed = seed
        
    def __iter__(self):
        
        if self.seed is not None:
            np.random.seed(self.seed)
        
        while True:
            
            sparse_tensor = np.random.choice(self.sparse_tensors, size=1)[0]
        
            last_start = (sparse_tensor.shape[1] - 2 * self.sequence_length - 3)
            note_start = np.random.choice(np.arange(last_start), size=1)[0]

            note = tf.sparse.slice(sparse_tensor,
                                   start=[0, note_start],
                                   size=[128, self.sequence_length]
                                  )
            
            #yield tf.sparse.to_dense(note)
            yield tf.expand_dims(tf.sparse.to_dense(note), axis=-1)
            
            
    def __call__(self):
        return self.__iter__()
        

In [147]:
DATA_SIZE_OF_GENERATORS = 50

train_sub_generators = [NoteTargetGenerator(PIANO_ROLLS_TRAIN[i*DATA_SIZE_OF_GENERATORS:(i+1)*DATA_SIZE_OF_GENERATORS], False, SEQUENCE_LENGTH, i)
                        for i in range(1 + PIANO_ROLLS_TRAIN.shape[0] // DATA_SIZE_OF_GENERATORS)
                        if i*DATA_SIZE_OF_GENERATORS < PIANO_ROLLS_TRAIN.shape[0]]

test_sub_generators = [NoteTargetGenerator(PIANO_ROLLS_TEST[i*DATA_SIZE_OF_GENERATORS:(i+1)*DATA_SIZE_OF_GENERATORS], False, SEQUENCE_LENGTH, i)
                        for i in range(1 + PIANO_ROLLS_TEST.shape[0] // DATA_SIZE_OF_GENERATORS)
                        if i*DATA_SIZE_OF_GENERATORS < PIANO_ROLLS_TEST.shape[0]]

cvae_gen_output_signature = tf.TensorSpec(shape=(128, SEQUENCE_LENGTH, 1))

def get_sub_dataset(sub_generator, spec, batch_size, prefetch_size):
    
    return (tf.data.Dataset
            .from_generator(sub_generator, output_signature=spec)
            .batch(batch_size, drop_remainder=True)
            .prefetch(prefetch_size)
           )

cvae_train_sub_datasets = [get_sub_dataset(g, cvae_gen_output_signature, BATCH_SIZE, 10)
                           for g in train_sub_generators]
cvae_test_sub_datasets = [get_sub_dataset(g, cvae_gen_output_signature, BATCH_SIZE, 10)
                           for g in test_sub_generators]

cvae_train_dataset = tf.data.Dataset.sample_from_datasets(cvae_train_sub_datasets).prefetch(64)
cvae_test_dataset = tf.data.Dataset.sample_from_datasets(cvae_test_sub_datasets).prefetch(64)

In [148]:
%%time
np.random.seed(333)
sample_input = None
for x in cvae_test_dataset.take(1):
    sample_input = x

Wall time: 106 ms


In [149]:
sample_input.shape

TensorShape([32, 128, 64, 1])

In [150]:
sample_input.numpy().max()*127.

112.99999910593033

## Training the CVAE to produce notes

First, we train the CVAE to learn the distribution of pianoroll values which correspond to notes/chords, and how to generate them. It is trained on batches of 128 x SEQUENCE_LENGTH tensors.

In [65]:
class CVAE(tf.keras.Model):
    
    def __init__(self, latent_dim, sequence_length):
        super(CVAE, self).__init__()
        
        self.latent_dim = latent_dim
        self.sequence_length = sequence_length
        
        #encoder_input = Input(shape=(128, self.sequence_length), name='encoder_input')
        #encoder_reshape = Reshape(target_shape=(128, self.sequence_length, 1), name='encoder_reshape')(encoder_input)
        
        encoder_input = Input(shape=(128, self.sequence_length, 1), name='encoder_input')
        
        encoder_conv_1 = Conv2D(filters=64,kernel_size=(4, 4), strides=(4, 4), padding='valid', activation='relu',name='encoder_conv2d_1')(encoder_input)
        encoder_conv_2 = Conv2D(filters=128, kernel_size=(4, 4), strides=(4, 4), padding='valid', activation='relu', name='encoder_conv2d_2')(encoder_conv_1)
        encoder_conv_3 = Conv2D(filters=256, kernel_size=(8, 4), strides=(8, 4), padding='valid', activation='relu', name='encoder_conv2d_3')(encoder_conv_2)
        encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)
        
        encoder_mean = Dense(self.latent_dim, name='encoder_mean')(encoder_flatten)
        encoder_var = Dense(self.latent_dim, name='encoder_variance')(encoder_flatten)
        
        self.encoder = Model(encoder_input, [encoder_mean, encoder_var], name='encoder')
                
        decoder_input = Input(shape=(self.latent_dim), name='decoder_input')
        decoder_dense = Dense(units=256, activation='relu', name='decoder_dense')(decoder_input)
        x = Reshape(target_shape=(1, 1, 256), name='decoder_reshape')(decoder_dense)
        x = Conv2DTranspose(filters=128, kernel_size=(8, 2), strides=(8, 2), padding='valid', activation='relu', name='decoder_conv2dtranspose_1')(x)
        x = Conv2DTranspose(filters=64, kernel_size=(4, 4), strides=(4, 4), padding='valid', activation='relu', name='decoder_conv2dtranspose_2')(x)
        x = Conv2DTranspose(filters=1, kernel_size=(4, 4), strides=(4, 4), padding='valid', name='decoder_conv2dtranspose_3')(x)
        
        decoder_output = x
        #decoder_output = Reshape(target_shape=(128, self.sequence_length), name='decoder_output')(x)
        
        self.decoder = Model(decoder_input, decoder_output, name='decoder')
        
    def compile(self, optimizer):
        super(CVAE, self).compile()
        
        self.optimizer = optimizer
        self.encoder.compile(optimizer=self.optimizer)
        self.decoder.compile(optimizer=self.optimizer)
        
    def call(self, inputs, is_training=False):
        
        inputs_is_list = isinstance(inputs, list)
        
        if inputs_is_list and is_training:
            return [self.train_step(x_y) for x_y in inputs]
        
        elif inputs_is_list and not is_training:
            return [self.test_step(x_y) for x_y in inputs]
        
        elif not inputs_is_list and is_training:
            return self.train_step(x_y)
        
        elif not inputs_is_list and not is_training:
            return self.test_step(x_y)
        
    def log_normal_pdf(self, sample, mean, logvar, raxis=1):
          
        ln2pi = tf.math.log(2. * np.pi)
        x = -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + ln2pi)
        return tf.reduce_sum(x, axis=raxis)
        
    def compute_loss(self, x):
                
        mean, logvar = self.encoder(x)
        z = self.reparameterize(mean, logvar)
        x_logit = self.decode(z)
                
        # tf crossentropy
        cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(
            logits=x_logit, labels=x
        )
        
        # K crossentropy
        #cross_entropy = K.binary_crossentropy(target=x, output=x_logit)    
        
        logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
        logpz = self.log_normal_pdf(z, 0., 0.)
        logqz_x = self.log_normal_pdf(z, mean, logvar)
        return -tf.reduce_mean(logpx_z + logpz - logqz_x)    
    
    @tf.function
    def train_step(self, x):
        
        # print(tf.executing_eagerly()) --> False
                
        with tf.GradientTape() as tape:
            loss = self.compute_loss(x)
        
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        return {'loss': loss}
    
    def get_latent_vect(self, x):
        
        mean, logvar = self.encoder(x)
        return self.reparameterize(mean, logvar)
    
    def test_step(self, x):
        
        loss = self.compute_loss(x)
        return {'loss': loss}
    
    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean
        
    def decode(self, z, apply_sigmoid=False):
        logits = self.decoder(z)
        if apply_sigmoid:
            return tf.sigmoid(logits)
        return logits
        
    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)
    
    def generate(self, x):
        
        if isinstance(x, tf.sparse.SparseTensor):
            x = tf.sparse.to_dense(x)
            
        mean, logvar = self.encoder(x)
        z = self.reparameterize(mean, logvar)
        return self.sample(z)            
    

In [66]:
cvae = CVAE(LATENT_DIM, SEQUENCE_LENGTH)
cvae.compile(tf.keras.optimizers.Adam(1e-3))

In [88]:
cvae.encoder.load_weights('C:/_local/py/yt_piano_music_gen/models/cvae/encoder_64_z_64_seq_15_epochs_288_loss.hdf5')
cvae.decoder.load_weights('C:/_local/py/yt_piano_music_gen/models/cvae/decoder_64_z_64_seq_15_epochs_288_loss.hdf5')

In [67]:
cvae.encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 128, 50, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 encoder_conv2d_1 (Conv2D)      (None, 32, 13, 64)   1088        ['encoder_input[0][0]']          
                                                                                                  
 encoder_conv2d_2 (Conv2D)      (None, 8, 4, 128)    131200      ['encoder_conv2d_1[0][0]']       
                                                                                                  
 encoder_conv2d_3 (Conv2D)      (None, 1, 2, 256)    524544      ['encoder_conv2d_2[0][0]'] 

In [68]:
cvae.decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 16)]              0         
                                                                 
 decoder_dense (Dense)       (None, 256)               4352      
                                                                 
 decoder_reshape (Reshape)   (None, 1, 2, 128)         0         
                                                                 
 decoder_conv2dtranspose_1 (  (None, 8, 4, 128)        262272    
 Conv2DTranspose)                                                
                                                                 
 decoder_conv2dtranspose_2 (  (None, 32, 16, 64)       131136    
 Conv2DTranspose)                                                
                                                                 
 decoder_conv2dtranspose_3 (  (None, 128, 64, 1)       1025

In [86]:
class CVAECheckpointCallback(tf.keras.callbacks.Callback):
    
    def __init__(self, model_dir):
        
        self.model_dir = model_dir
        
        self.best_loss = np.Inf
        self.best_epoch = 0
        
        self.best_encoder = None
        self.best_decoder = None
        self.best_model = None
                        
    def on_epoch_end(self, epoch, loss_dict):
                        
        if loss_dict['loss'] < self.best_loss:
            
            self.best_epoch = epoch
            self.best_loss = loss_dict['loss']   
            
            self.best_encoder = self.model.encoder
            self.best_decoder = self.model.decoder
            self.best_model = self.model
    
    def on_train_end(self, loss_dict):
        
        encoder_file_name = f'encoder_{self.best_epoch}_epochs_{round(self.best_loss)}_loss'
        decoder_file_name = f'decoder_{self.best_epoch}_epochs_{round(self.best_loss)}_loss'
        cvae_file_name = f'cvae_{self.best_epoch}_epochs_{round(self.best_loss)}_loss'
        
        encoder_save_path = self.model_dir + encoder_file_name
        decoder_save_path = self.model_dir + decoder_file_name
        cvae_save_path = self.model_dir + cvae_file_name
        
        self.best_encoder.save(encoder_save_path)
        self.best_decoder.save(decoder_save_path)
        self.model.save(cvae_save_path)
    
cvae_ckpt_clbk = CVAECheckpointCallback(CVAE_DIR)

In [87]:
cvae_reduce_lr_clbk = tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=.1, min_delta=5, patience=3)
cvae_early_stop_clbk = tf.keras.callbacks.EarlyStopping(monitor='loss', min_delta=5, patience=7)

In [89]:
cvae.fit(x=cvae_train_dataset, batch_size=BATCH_SIZE,
         epochs=10, steps_per_epoch=1000,
         validation_data=cvae_test_dataset, validation_steps=150,
         shuffle=False,
         callbacks=[cvae_ckpt_clbk, 
                    cvae_reduce_lr_clbk, 
                    cvae_early_stop_clbk]
        )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: C:/_local/py/yt_piano_music_gen/models/cvae/5_epochs_151_loss\assets
INFO:tensorflow:Assets written to: C:/_local/py/yt_piano_music_gen/models/cvae/5_epochs_151_loss\assets


<keras.callbacks.History at 0x1359d7bf820>

In [109]:
play_samples_from_batch(sample_input.numpy(), 15, True)

sample # 29


In [90]:
sample_output = cvae.generate(sample_input).numpy()

In [101]:
play_samples_from_batch(sample_output, 15, True)

sample # 30


In [None]:
#BEST_ENCODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/encoder_5_epochs_466_elbo.hdf5'
#BEST_DECODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/decoder_5_epochs_466_elbo.hdf5'

#BEST_ENCODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/seq_32_encoder_15_epochs_275_elbo.hdf5'
#BEST_DECODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/seq_32_decoder_15_epochs_275_elbo.hdf5'

#BEST_ENCODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/seq_64_encoder_2_epochs_317_elbo.hdf5'
#BEST_DECODER_PATH = 'C:/_local/py/yt_piano_music_gen/models/cvae/seq_32_decoder_2_epochs_317_elbo.hdf5'

cvae = CVAE(CVAE_LATENT_DIM, SEQUENCE_LENGTH)
cvae.compile(tf.keras.optimizers.Adam(1e-3))

cvae.load_encoder(BEST_ENCODER_PATH)
cvae.load_decoder(BEST_DECODER_PATH)

In [145]:
cvae(sample_input).numpy().max()*127.

115.31005674600601

In [60]:
# Play some randomly generated samples
# np.random.seed(5)
sample_number = 15
sample_output = cvae.sample().numpy()*127.

play_samples(sample_output, sample_number)    

sample # 14


In [None]:
# Playing samples reconstructed from sample inputs
np.random.seed(6)
sample_output = cvae.generate(sample_input).numpy()*127.

play_samples(sample_output, 15)

## New note CVAE

In [151]:
optimizer = tf.keras.optimizers.Adam(1e-3)

X = Input(shape=(128, SEQUENCE_LENGTH), name='input')
X_add_channel = Reshape(target_shape=(128, SEQUENCE_LENGTH, 1), name='encoder_input_reshape')(X)

encoder_conv_1 = Conv2D(filters=64, kernel_size=(4, 4), strides=(4, 4), 
                        activation='relu', padding='same', name='conv2d_1')(X_add_channel)

#encoder_relu_1 = Relu(name='encoder_relu_1')(encoder_batch_norm_1)
#encoder_batch_norm_1 = BatchNorm(name='encoder_batch_norm_1')(encoder_conv_1)

encoder_conv_2 = Conv2D(filters=128, kernel_size=(4, 4), strides=(4, 4), 
                        activation='relu', padding='same', name='conv2d_2')(encoder_conv_1)

#encoder_batch_norm_2 = BatchNorm(name='encoder_batch_norm_2')(encoder_conv_2)
#encoder_relu_2 = Relu(name='encoder_relu_2')(encoder_batch_norm_2)

encoder_conv_3 = Conv2D(filters=256, kernel_size=(8, 4), strides=(8, 4), 
                        activation='relu', padding='same', name='conv2d_3')(encoder_conv_2)

#encoder_batch_norm_3 = BatchNorm(name='encoder_batch_norm_3')(encoder_conv_3)
#encoder_relu_3 = Relu(name='encoder_relu_3')(encoder_batch_norm_3)

encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)

encoder_dense_1 = Dense(256, activation='relu', name='encoder_dense_1')(encoder_flatten)
encoder_dense_2 = Dense(256, activation='relu', name='encoder_dense_2')(encoder_dense_1)
encoder_dense_3 = Dense(256, activation='relu', name='encoder_dense_3')(encoder_dense_2)

encoder_mu = Dense(LATENT_DIM, activation='linear', name='encoder_mu')(encoder_dense_3)
encoder_log_sigma = Dense(LATENT_DIM, activation='linear', name='encoder_log_sigma')(encoder_dense_3)

encoder = Model(X, encoder_mu, name='encoder')
encoder.compile(optimizer=optimizer)

In [152]:
encoder.summary()

Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input (InputLayer)          [(None, 128, 64)]         0         
                                                                 
 encoder_input_reshape (Resh  (None, 128, 64, 1)       0         
 ape)                                                            
                                                                 
 conv2d_1 (Conv2D)           (None, 32, 16, 64)        1088      
                                                                 
 conv2d_2 (Conv2D)           (None, 8, 4, 128)         131200    
                                                                 
 conv2d_3 (Conv2D)           (None, 1, 1, 256)         1048832   
                                                                 
 encoder_flatten (Flatten)   (None, 256)               0         
                                                           

In [153]:
# Reparameterization and concatenation layers / specification
def sample_and_reparameterize(mu_logsigma):
    mu, log_sigma = mu_logsigma
    
    b, k = mu.shape
    
    eps = K.random_normal(shape=(k,))
    return mu + K.exp(log_sigma * .5) * eps

Z = Lambda(sample_and_reparameterize, output_shape=(LATENT_DIM,),
           name='sample_and_reparameterize')([encoder_mu, encoder_log_sigma])

In [154]:
# Standard decoder declaration

decoder_dense_1_layer = Dense(256, activation='relu', name='decoder_dense_1')
decoder_dense_2_layer = Dense(256, activation='relu', name='decoder_dense_2')
decoder_dense_3_layer = Dense(256, activation='relu', name='decoder_dense_3')
decoder_reshape_1_layer = Reshape(target_shape=(1, 1, 256), name='reshape')

decoder_conv_1_layer = Conv2DTranspose(filters=128, kernel_size=(8, 4), strides=(8, 4), 
                                       activation='relu', padding='valid', 
                                       name='conv2dtranspose_1')

decoder_conv_2_layer = Conv2DTranspose(filters=64, kernel_size=(4, 4), strides=(4, 4), 
                                       activation='relu', padding='valid', 
                                       name='conv2dtranspose_2')

decoder_conv_3_layer = Conv2DTranspose(filters=1, kernel_size=(4, 4), strides=(4, 4), 
                                       activation='sigmoid', padding='valid', name='conv2dtranspose_3')

decoder_output_layer = Reshape(target_shape=(128, SEQUENCE_LENGTH), name='decoder_output')

d1 = decoder_dense_1_layer(Z)
d2 = decoder_dense_2_layer(d1)
d3 = decoder_dense_3_layer(d2)
d4 = decoder_reshape_1_layer(d3)
d5 = decoder_conv_1_layer(d4)
d6 = decoder_conv_2_layer(d5)
d7 = decoder_conv_3_layer(d6)
cvae_output = decoder_output_layer(d7)

decoder_input = Input(shape=(LATENT_DIM,), name='decoder_input')
decoder_dense_1 = decoder_dense_1_layer(decoder_input)
decoder_dense_2 = decoder_dense_2_layer(decoder_dense_1)
decoder_dense_3 = decoder_dense_3_layer(decoder_dense_2)
decoder_reshape_1 = decoder_reshape_1_layer(decoder_dense_3)
decoder_conv_1 = decoder_conv_1_layer(decoder_reshape_1)
decoder_conv_2 = decoder_conv_2_layer(decoder_conv_1)
decoder_conv_3 = decoder_conv_3_layer(decoder_conv_2)
decoder_output = decoder_output_layer(decoder_conv_3)

decoder = Model(decoder_input, decoder_output, name='decoder')
decoder.compile(optimizer=optimizer)

cvae = Model(X, cvae_output, name='cvae')

In [155]:
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 128)]             0         
                                                                 
 decoder_dense_1 (Dense)     (None, 256)               33024     
                                                                 
 decoder_dense_2 (Dense)     (None, 256)               65792     
                                                                 
 decoder_dense_3 (Dense)     (None, 256)               65792     
                                                                 
 reshape (Reshape)           (None, 1, 1, 256)         0         
                                                                 
 conv2dtranspose_1 (Conv2DTr  (None, 8, 4, 128)        1048704   
 anspose)                                                        
                                                           

In [156]:
cvae.summary()

Model: "cvae"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input (InputLayer)             [(None, 128, 64)]    0           []                               
                                                                                                  
 encoder_input_reshape (Reshape  (None, 128, 64, 1)  0           ['input[0][0]']                  
 )                                                                                                
                                                                                                  
 conv2d_1 (Conv2D)              (None, 32, 16, 64)   1088        ['encoder_input_reshape[0][0]']  
                                                                                                  
 conv2d_2 (Conv2D)              (None, 8, 4, 128)    131200      ['conv2d_1[0][0]']            

In [157]:
# Keras Loss functions

def reconstruction_loss(x_true, x_pred):
    
    cross_entropy = K.binary_crossentropy(target=x_true, output=x_pred)    
    return K.sum(cross_entropy, axis=[1, 2])

def kl_divergence(emu, els):
    
    kl_2 = K.exp(els) + K.square(emu) - 1. - els
    kl = .5 * K.sum(kl_2, axis=-1)
    return kl #*10

# Tensorflow loss functions

#def reconstruction_loss(x_true, x_pred):
    
#    cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=x_true, logits=x_pred)    
#    return tf.reduce_sum(cross_entropy, axis=-1)

#def kl_divergence(emu, els):
    
#    kl_2 = tf.exp(els) + tf.square(emu) - 1. - els
#    return .5 * tf.reduce_sum(kl_2, axis=-1)


def vae_loss(x_true, x_pred, emu, els):
    
    recon_loss = reconstruction_loss(x_true, x_pred)
    kl_loss = kl_divergence(emu, els)
    
    return recon_loss + kl_loss

In [158]:
if len(cvae.losses) == 0:
    cvae.add_loss(vae_loss(X, cvae_output, encoder_mu, encoder_log_sigma))
    
if len(cvae.metrics) == 0:
    cvae.add_metric(reconstruction_loss(X, cvae_output), name='reconstruction_loss')
    cvae.add_metric(kl_divergence(encoder_mu, encoder_log_sigma), name='kl_divergence')
    
cvae.compile(optimizer=tf.keras.optimizers.Adam(1e-3), loss=None)

In [159]:
class VAECheckpointCallback(tf.keras.callbacks.Callback):
    
    def __init__(self, model_dir):
        
        self.encoder_dir = model_dir + 'encoder/'
        self.decoder_dir = model_dir + 'decoder/'
        
        self.best_encoder = None
        self.best_decoder = None
        
        self.best_epoch = 0
        self.best_loss = np.Inf
        
    def on_epoch_end(self, epoch, loss_dict):
                        
        if loss_dict['loss'] < self.best_loss:
            
            self.best_epoch = epoch
            self.best_loss = loss_dict['loss']   
            
            global encoder
            global decoder
            
            self.best_encoder = encoder
            self.best_decoder = decoder
    
    def on_train_end(self, loss_dict):
        
        encoder_file_name = f'encoder_{self.best_epoch}_epochs_{round(self.best_loss)}_loss'
        decoder_file_name = f'decoder_{self.best_epoch}_epochs_{round(self.best_loss)}_loss'
        
        encoder_save_path = self.encoder_dir + encoder_file_name
        decoder_save_path = self.decoder_dir + decoder_file_name
        
        self.best_encoder.save(encoder_save_path)
        self.best_decoder.save(decoder_save_path)

vae_ckpt_clbk = VAECheckpointCallback(NEW_VAE_DIR)
vae_reduce_lr_clbk = tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', min_delta=3, patience=3, factor=.1)
vae_early_stop_clbk = tf.keras.callbacks.EarlyStopping(monitor='loss', min_delta=3, patience=6)

vae_clbks = [vae_ckpt_clbk, 
             vae_reduce_lr_clbk, 
             vae_early_stop_clbk
            ]

In [None]:
cvae.fit(x=cvae_train_dataset, shuffle=False,
         batch_size=BATCH_SIZE, steps_per_epoch=1024, 
         epochs=20,
         validation_data=cvae_test_dataset, validation_steps=128,
         callbacks=vae_clbks
        )

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20

In [229]:
min_val_loss = round(min(cvae.history.history['val_loss']))

ENCODER_DIR = 'C:/_local/py/yt_piano_music_gen/models/new_cvae/encoder/'
ENCODER_FILE_NAME = f'encoder_s_{SEQUENCE_LENGTH}_z_{LATENT_DIM}_epochs_30_loss_{min_val_loss}'
ENCODER_PATH = ENCODER_DIR + ENCODER_FILE_NAME

DECODER_DIR = 'C:/_local/py/yt_piano_music_gen/models/new_cvae/decoder/'
DECODER_FILE_NAME = f'decoder_seq_{SEQUENCE_LENGTH}_z_{LATENT_DIM}_epochs_30_loss_{min_val_loss}'
DECODER_PATH = DECODER_DIR + DECODER_FILE_NAME

encoder.save(ENCODER_PATH)
decoder.save(DECODER_PATH)

INFO:tensorflow:Assets written to: C:/_local/py/yt_piano_music_gen/models/new_cvae/encoder/encoder_s_32_z_128_epochs_30_loss_242\assets
INFO:tensorflow:Assets written to: C:/_local/py/yt_piano_music_gen/models/new_cvae/decoder/decoder_seq_32_z_128_epochs_30_loss_242\assets


In [165]:
def generate(decoder, num_to_generate):
    
    eps = tf.random.normal(shape=(num_to_generate, LATENT_DIM))
    logits = decoder(eps)
    return logits

In [166]:
sample_output = tf.reshape(generate(decoder, 100), shape=(100, 128, SEQUENCE_LENGTH))
sample_output = sample_output.numpy() * 127.

In [167]:
sample_output.max()

77.16903

In [168]:
play_samples_from_batch(sample_output, 25, True)

sample # 99


## Tham VAE

In [None]:
class MyVAE(tf.keras.Model):
    
    def __init__(self, latent_dim, sequence_length):
        super(ThamVAE, self).__init__()
        
        encoder_input = Input(shape=(128, self.sequence_length), name='encoder_input')
        encoder_reshape = Reshape(target_shape=(128, self.sequence_length, 1), name='encoder_reshape')(encoder_input)
        encoder_conv_1 = Conv2D(filters=64,kernel_size=(4, 4), strides=(4, 4), activation='relu',name='encoder_conv2d_1')(encoder_reshape)
        encoder_conv_2 = Conv2D(filters=128, kernel_size=(4, 4), strides=(4, 4), activation='relu', name='encoder_conv2d_2')(encoder_conv_1)
        encoder_conv_3 = Conv2D(filters=256, kernel_size=(8, 2), strides=(8, 2), activation='relu', name='encoder_conv2d_3')(encoder_conv_2)
        encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)
        
        encoder_output = Dense(self.latent_dim+1, name='decoder_output')(encoder_flatten)
        
        self.encoder = Model(encoder_input, encoder_output, name='encoder')
                
        decoder_input = Input(shape=(self.latent_dim), name='decoder_input')
        decoder_dense = Dense(units=256, activation='relu', name='decoder_dense')(decoder_input)
        x = Reshape(target_shape=(1, 1, 256), name='decoder_reshape')(decoder_dense)
        x = Conv2DTranspose(filters=128, kernel_size=(8, 4), strides=(8, 4), padding='same', activation='relu', name='decoder_conv2dtranspose_1')(x)
        x = Conv2DTranspose(filters=64, kernel_size=(4, 4), strides=(4, 4), padding='same', activation='relu', name='decoder_conv2dtranspose_2')(x)
        x = Conv2DTranspose(filters=1, kernel_size=(4, 4), strides=(4, 4), padding='same', activation='linear', name='decoder_conv2dtranspose_3')(x)
        decoder_output = Reshape(target_shape=(128, self.sequence_length), name='decoder_output')(x)
        
        self.decoder = Model(decoder_input, decoder_output, name='decoder')
     
    def sample_and_reparameterize(self, mu_log_sigma, n_samples):
        
        b, k = mu_log_sigma.shape
        k -= 1
        
        mu, sigma = mu_sigma[:, :-1], tf.exp(mu_sigma[:, -1])
        eps = tf.random.normal(shape=(b, n_samples, k))
        return eps * tf.reshape(sigma, (b, 1, 1)) + tf.reshape(mu, (b, 1, k))
    
    def log_p_x(self, x, mu_x, sigma_x):
        
        b, n = x.shape[:2]
        
        x = tf.reshape(x, (b, 1, -1))
        _, _, p = x.shape
        
        square_error = (x - tf.reshape(mu_x, (b, n, -1)))**2 / (2*sigma_x**2)
        
        log_sigma = tf.log(sigma_x)
        log_sigma = tf.reduce_sum(log_sigma, axis=2)
        log_sigma = tf.reduce_mean(log_sigma, axis=[0, 1])
        
        return -square_error - log_sigma
        
    def kl_q_p(self, z, mu_log_sigma):
        
        b, n, k = z.shape
        mu_q, log_sigma_q = mu_log_sigma
        
        log_p = -.5 * tf.square(z)
        
        log_q = -.5 * tf.square(z - tf.reshape(mu_q, (b, 1, k)))
        log_q /= tf.square(tf.reshape(tf.exp(log_sigma_q), (b, 1, 1)))
        log_q -= tf.reshape(log_sigma_q, (b, 1, -1))
        
        kl = tf.reduce_sum(log_q - log_p, axis=2)
        return tf.reduce_mean(kl, axis=[1, 1])
        
    def elbo(self, x, n=1):
        
        mu, log_sigma = self.encoder(x)
        Z = self.sample_and_reparameterize(mu, log_sigma, n)
        mu_x = self.decoder(Z)
        
        return log_p_x(x, mu_x, )
        

# Training CVAE on sequence of notes

## Creating TF Datasets of sequences of latent vectors

In [19]:
ENCODER_DIR = 'C:/_local/py/yt_piano_music_gen/models/new_cvae/encoder/'
DECODER_DIR = 'C:/_local/py/yt_piano_music_gen/models/new_cvae/decoder/'

epochs = 30

ENCODER_FILE_NAME = 'encoder_7_epochs_235_loss'
DECODER_FILE_NAME = 'decoder_7_epochs_235_loss'

#ENCODER_FILE_NAME = f'encoder_s_{SEQUENCE_LENGTH}_z_{LATENT_DIM}_epochs_{epochs}_loss_242'
#DECODER_FILE_NAME = f'decoder_seq_{SEQUENCE_LENGTH}_z_{LATENT_DIM}_epochs_{epochs}_loss_242'

ENCODER_PATH = ENCODER_DIR + ENCODER_FILE_NAME
DECODER_PATH = DECODER_DIR + DECODER_FILE_NAME

encoder = tf.keras.models.load_model(ENCODER_PATH)
decoder = tf.keras.models.load_model(DECODER_PATH)

In [20]:
encoder.summary()

Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input (InputLayer)          [(32, 128, 32)]           0         
                                                                 
 encoder_input_reshape (Resh  (32, 128, 32, 1)         0         
 ape)                                                            
                                                                 
 conv2d_1 (Conv2D)           (32, 32, 8, 64)           1088      
                                                                 
 conv2d_2 (Conv2D)           (32, 8, 2, 128)           131200    
                                                                 
 conv2d_3 (Conv2D)           (32, 1, 1, 256)           524544    
                                                                 
 encoder_flatten (Flatten)   (32, 256)                 0         
                                                           

In [116]:
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 16)]              0         
                                                                 
 decoder_dense_1 (Dense)     (None, 256)               4352      
                                                                 
 decoder_dense_2 (Dense)     (None, 256)               65792     
                                                                 
 decoder_dense_3 (Dense)     (None, 256)               65792     
                                                                 
 reshape (Reshape)           (None, 1, 1, 256)         0         
                                                                 
 conv2dtranspose_1 (Conv2DTr  (None, 8, 2, 128)        524416    
 anspose)                                                        
                                                           

In [101]:
class MelodyTargetSequenceLatentVectorsGenerator:
    
    def __init__(self, sparse_tensors, 
                 melody_sequence_length, melody_number_of_sequences, 
                 target_sequence_length,
                 encoder=None, seed=None):
        
        self.sparse_tensors = sparse_tensors
        self.num_tensors = len(self.sparse_tensors)
        
        self.melody_sequence_length = melody_sequence_length
        self.melody_number_of_sequences = melody_number_of_sequences
        self.melody_total_length = self.melody_sequence_length * self.melody_number_of_sequences
        
        self.target_sequence_length = target_sequence_length
        
        self.encoder = encoder
        self.return_latent_vectors = self.encoder is not None
        
        self.seed = seed
        
    def __iter__(self):
        
        if self.seed is not None:
            np.random.seed(self.seed)
        
        while True:
            
            sparse_tensor = self.sparse_tensors[np.random.randint(0, self.num_tensors)]
                                    
            last_start = (sparse_tensor.shape[1] - self.melody_total_length - self.target_sequence_length - 3)
            melody_start = np.random.randint(0, last_start)
            target_start = melody_start + self.melody_total_length
            
            melody = tf.sparse.slice(sparse_tensor,
                                     start=[0, melody_start],
                                     size=[128, self.melody_total_length]
                                    )
            
            melody = tf.sparse.to_dense(melody)
            
            melody = tf.split(melody, axis=1, num_or_size_splits=self.melody_number_of_sequences)
            melody = tf.stack(melody)
            
            target = tf.sparse.slice(sparse_tensor,
                                     start=[0, target_start],
                                     size=[128, self.target_sequence_length]
                                    )
            
            target = tf.sparse.to_dense(target)
            #target = tf.expand_dims(target, 0)
                
            if self.return_latent_vectors:
                yield  self.encoder(melody), tf.squeeze(self.encoder(target))
            
            else:
                yield melody, target
            
                    
    def __call__(self):
        return self.__iter__()
    

In [102]:
DATA_SIZE_OF_GENERATORS = 50

MELODY_NUMBER_OF_SEQUENCES = 8

MELODY_TOTAL_LENGTH = MELODY_NUMBER_OF_SEQUENCES * SEQUENCE_LENGTH
TARGET_TOTAL_LENGTH = SEQUENCE_LENGTH

train_sub_generators = [MelodyTargetSequenceLatentVectorsGenerator(
                        PIANO_ROLLS_TRAIN[i*DATA_SIZE_OF_GENERATORS:(i+1)*DATA_SIZE_OF_GENERATORS], 
                        SEQUENCE_LENGTH, MELODY_NUMBER_OF_SEQUENCES, SEQUENCE_LENGTH, seed=i)
                        
                        for i in range(1 + PIANO_ROLLS_TRAIN.shape[0] // DATA_SIZE_OF_GENERATORS)
                        if i*DATA_SIZE_OF_GENERATORS < PIANO_ROLLS_TRAIN.shape[0]]

test_sub_generators = [MelodyTargetSequenceLatentVectorsGenerator(
                       PIANO_ROLLS_TEST[i*DATA_SIZE_OF_GENERATORS:(i+1)*DATA_SIZE_OF_GENERATORS], 
                       SEQUENCE_LENGTH, MELODY_NUMBER_OF_SEQUENCES, SEQUENCE_LENGTH, seed=i)
                       
                       for i in range(1 + PIANO_ROLLS_TEST.shape[0] // DATA_SIZE_OF_GENERATORS)
                       if i*DATA_SIZE_OF_GENERATORS < PIANO_ROLLS_TEST.shape[0]]

melody_gen_output_signature_latent = (tf.TensorSpec(shape=(MELODY_NUMBER_OF_SEQUENCES, LATENT_DIM)),
                                   tf.TensorSpec(shape=(LATENT_DIM)))

melody_gen_output_signature_roll = (tf.TensorSpec(shape=(MELODY_NUMBER_OF_SEQUENCES, 128, SEQUENCE_LENGTH)),
                                 tf.TensorSpec(shape=(128, SEQUENCE_LENGTH)))

melody_gen_output_signature = melody_gen_output_signature_roll

def get_sub_dataset(sub_generator, spec, batch_size, prefetch_size):
    
    return (tf.data.Dataset
            .from_generator(sub_generator, output_signature=spec)
            .batch(batch_size, drop_remainder=True)
            .prefetch(prefetch_size)
           )

melody_train_sub_datasets = [get_sub_dataset(g, melody_gen_output_signature, BATCH_SIZE, 5)
                          for g in train_sub_generators]
melody_test_sub_datasets = [get_sub_dataset(g, melody_gen_output_signature, BATCH_SIZE, 5)
                         for g in test_sub_generators]

melody_train_dataset = tf.data.Dataset.sample_from_datasets(melody_train_sub_datasets).prefetch(64)
melody_test_dataset = tf.data.Dataset.sample_from_datasets(melody_test_sub_datasets).prefetch(64)

In [111]:
%%time
sample_input = None
for x in melody_train_dataset.take(1):
    sample_input = x

Wall time: 250 ms


In [112]:
melody_train_dataset.element_spec

(TensorSpec(shape=(32, 8, 128, 32), dtype=tf.float32, name=None),
 TensorSpec(shape=(32, 128, 32), dtype=tf.float32, name=None))

In [113]:
sample_input[0].numpy().max()*127.

107.00000196695328

## Creating and training melody predictor model

To-do:<br>
#1. Compare computing loss via latent vector MSE vs cross-entropy between input target pianoroll and decoded predicted latent vectors <br>
#2. Consider creating model which learns autoencoding next target piano roll given melody

In [119]:
# Handles encoding of sequences into latent vectors
class VAEMelodyPredictior(tf.keras.Model):
    
    def __init__(self, latent_dim, batch_size,
                 sequence_length, melody_number_of_sequences, 
                 encoder, decoder):
        super(VAEMelodyPredictior, self).__init__()
        
        self.latent_dim = latent_dim
        self.batch_size = batch_size
        
        self.sequence_length = sequence_length
        self.melody_number_of_sequences = melody_number_of_sequences
        
        self.encoder = encoder
        self.decoder = decoder
        
        # Set encoder to non-trainable
        for l, _ in enumerate(self.encoder.layers):
            self.encoder.get_layer(index=l).trainable = False
        
        # Set decoder to non-trainable
        for l, _ in enumerate(self.decoder.layers):
            self.decoder.get_layer(index=l).trainable = False
            
        #rnn_input = Input(shape=(self.melody_number_of_sequences, self.latent_dim), name='rnn_input')
        
        #rnn_lstm_1 = LSTM(128, activation='relu', return_sequences=True, name='rnn_lstm_1')(rnn_input)
        #rnn_lstm_2 = LSTM(128, activation='relu', name='rnn_lstm_2')(rnn_lstm_1)
        
        #rnn_dense_1 = Dense(128, activation='relu', name='rnn_dense_1')(rnn_lstm_2)
        
        #rnn_output = Dense(self.latent_dim, activation='relu', name='rnn_output')(rnn_dense_1)
        
        # MLP Definition
        mlp_input = Input(shape=(self.melody_number_of_sequences, self.latent_dim), 
                          batch_size=self.batch_size, name='dense_input')
        mlp_flatten = Flatten(name='mlp_flatten')(mlp_input)
        
        mlp_dense_1 = Dense(256, activation='relu', kernel_regularizer='l2', name='dense_1')(mlp_flatten)
        #mlp_dropout_1 = Dropout(.2, name='dropout_1')(mlp_dense_1)
        
        mlp_dense_2 = Dense(256, activation='relu', kernel_regularizer='l2', name='dense_2')(mlp_dense_1)
        #mlp_dropout_2 = Dropout(.2, name='dropout_2')(mlp_dense_2)
        
        mlp_dense_3 = Dense(256, activation='relu', kernel_regularizer='l2', name='dense_3')(mlp_dense_2)
        #mlp_dropout_3 = Dropout(.5, name='dropout_3')(mlp_dense_3)
        
        mlp_dense_4 = Dense(256, activation='relu', kernel_regularizer='l2', name='dense_4')(mlp_dense_3)
        mlp_dropout_4 = Dropout(.2, name='mlp_dropout_4')(mlp_dense_3)
        
        mlp_output = Dense(self.latent_dim, activation='linear', name='mlp_output')(mlp_dense_4)
        
        # Model declaration
        self.model = Model(mlp_input, mlp_output, name='model')
        
    def compile(self, optimizer):
        super(VAEMelodyPredictior, self).compile()
        
        self.optimizer = optimizer
        self.model.compile(optimizer=self.optimizer)
        
    def melody_to_stacked_tensor(self, x):
        
        # x has shape 128 x time_steps
        
        x_truncate = x.shape[-1] - x.shape[-1] % self.sequence_length
        x_num_splits = x[:, :x_truncate].shape[-1] // self.sequence_length
        
        x_split = tf.split(x[:x_truncate], num_or_size_splits=x_num_splits)
        return tf.stack(x_split)
        
    def encode_batch(self, B):
        
        # B has shape batch_size x melody_num_sequences x 128 x sequence_length
        
        #obs = [self.encoder(B[b])[0] for b in range(B.shape[0])] # <-- using CVAE class
        obs = [self.encoder(B[b]) for b in range(B.shape[0])] # <-- using standalone encoder/decoder
        
        # returns tensor shaped batch_size x melody_num_sequences x latent_dim
        return tf.stack(obs)
    
    def encode_melody(self, melody):
        
        # melody has shape melody_num_sequences x 128 x sequence_length
        latent_vectors = [self.encoder(tf.expand_dims(melody[m], -1)) for m in range(m.shape[0])]
        return tf.concat(latent_vectors, axis=0)
        
    def compute_loss(self, x_y):
        
        x, y = x_y
        
        x_latent_vectors = self.encode_batch(x)
        y_latent_vectors = self.encoder(y)
        
        y_pred_latent_vectors = self.model(x_latent_vectors)
        y_pred = self.decoder(y_pred_latent_vectors)
        
        # Reconstruction loss between piano rolls
        #cross_entropy = K.binary_crossentropy(target=y, output=y_pred)    
        #return K.sum(cross_entropy, axis=[1, 2])
        
        # MSE between latent vectors
        return tf.keras.losses.MeanSquaredError()(y_latent_vectors, y_pred_latent_vectors)
        
    @tf.function
    def train_step(self, x_y):
        
        with tf.GradientTape() as tape:
            loss = self.compute_loss(x_y)
        
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        return {'mse': loss}
    
    @tf.function
    def test_step(self, x_y):
        
        loss = self.compute_loss(x_y)
        return {'mse': loss}
    
    def call(self, inputs, is_training=False):
        
        inputs_is_list = isinstance(inputs, list)
        
        if inputs_is_list and is_training:
            return [self.train_step(x_y) for x_y in inputs]
        
        elif inputs_is_list and not is_training:
            return [self.test_step(x_y) for x_y in inputs]
        
        elif not inputs_is_list and is_training:
            return self.train_step(x_y)
        
        elif not inputs_is_list and not is_training:
            return self.test_step(x_y)
        
    def predict_piano_roll(self, x_piano_rolls):
        
        if len(x_piano_rolls.shape) == 3:
            x_piano_rolls = np.expand_dims(x_piano_rolls, axis=0)
            
        if len(x_piano_rolls.shape) == 2:
            x_piano_rolls = self.melody_to_stacked_tensor(x_piano_rolls)
        
        x_latent = self.encode_batch(x_piano_rolls)
            
        y_pred_latent = self.model(x_latent)
        
        piano_roll_pred = self.decoder(y_pred_latent).numpy() * 127.
        piano_roll_pred[piano_roll_pred > 127] = 127
        piano_roll_pred[piano_roll_pred < 0] = 0
        return piano_roll_pred.round().astype('uint8')
        

In [120]:
melody_vae = VAEMelodyPredictior(LATENT_DIM, BATCH_SIZE, SEQUENCE_LENGTH, MELODY_NUMBER_OF_SEQUENCES, encoder, decoder)

#melody_vae = VAEMelodyPredictior(LATENT_DIM, BATCH_SIZE, 
#                                 SEQUENCE_LENGTH, MELODY_NUMBER_OF_SEQUENCES, 
#                                 cvae.encoder, cvae.decoder)

melody_vae.compile(tf.keras.optimizers.Adam(1e-3))

In [121]:
melody_vae.model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_input (InputLayer)    [(32, 8, 16)]             0         
                                                                 
 mlp_flatten (Flatten)       (32, 128)                 0         
                                                                 
 dense_1 (Dense)             (32, 256)                 33024     
                                                                 
 dense_2 (Dense)             (32, 256)                 65792     
                                                                 
 dense_3 (Dense)             (32, 256)                 65792     
                                                                 
 dense_4 (Dense)             (32, 256)                 65792     
                                                                 
 mlp_output (Dense)          (32, 16)                  4112  

In [122]:
class MelodyVaeCheckpointCallback(tf.keras.callbacks.Callback):
    
    def __init__(self, model_dir):
        
        self.model_dir = model_dir        
        self.best_loss = np.Inf
        
    def on_epoch_end(self, epoch, loss_dict):
                
        if loss_dict['mse'] < self.best_loss:
            
            self.latent_dim = self.model.latent_dim
            self.seq_length = self.model.sequence_length
            self.num_mel = self.model.melody_number_of_sequences
            
            self.best_epoch = epoch
            self.best_loss = loss_dict['mse']   
            
            self.best_model = self.model.model
    
    def on_train_end(self, loss_dict):
        
        model_file_name = (f'seq_{self.seq_length}_z_{self.latent_dim}_mel_{self.num_mel}_'
                           f'epochs_{self.best_epoch}_mse_{round(self.best_loss, 3)}'
                          )
        model_file_path = self.model_dir + model_file_name
        
        self.best_model.save(model_file_path)   

melody_vae_ckpt_clbk = MelodyVaeCheckpointCallback(MELODY_PREDICTION_MODELS_DIR)
melody_vae_reduce_lr_clbk = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_mse', patience=3, min_delta=.005, factor=.1)
melody_vae_early_stop_clbk = tf.keras.callbacks.EarlyStopping(monitor='val_mse', patience=6, min_delta=.005)

melody_vae_callbacks = [#melody_vae_ckpt_clbk, 
                        melody_vae_reduce_lr_clbk, 
                        melody_vae_early_stop_clbk
                       ]

In [124]:
melody_vae.fit(x=melody_train_dataset, batch_size=BATCH_SIZE, shuffle=False,
               epochs=10, steps_per_epoch=1024,
               validation_data=melody_test_dataset, validation_steps=128,
               callbacks=melody_vae_callbacks
              )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x17ca6ef1fa0>

In [None]:
MELODY_VAE_MODEL_DIR = 'C:/_local/py/yt_piano_music_gen/models/melody_predictor/128_z_16_seq_5_epochs_0.16_mse'
melody_vae.model.save(MELODY_VAE_MODEL_DIR)

#melody_vae.model = tf.keras.models.load_model(MELODY_VAE_MODEL_DIR)

## Generating music

In [125]:
sample_output_batch = melody_vae.predict_piano_roll(sample_input[0])

In [126]:
print(sample_output_batch.shape)
print(sample_output_batch.max())

(32, 128, 32)
42


In [127]:
play_samples_from_batch(sample_output_batch, 7, True)

sample # 24


In [128]:
sample_piano_roll = sample_input[0][6].numpy()*127.
sample_piano_roll = sample_piano_roll.round()
sample_piano_roll = np.concatenate([sample_piano_roll[s] for s in range(sample_piano_roll.shape[0])], axis=-1)

In [129]:
sample_output = melody_vae.predict_piano_roll(sample_input[0][6]).squeeze()

In [130]:
print(sample_piano_roll.shape)
print(sample_piano_roll.max())

(128, 256)
103.0


In [131]:
print(sample_output.shape)
print(sample_output.min())
print(sample_output.max())

(128, 32)
0
18


In [132]:
%%time
play_piano_roll(sample_piano_roll, 0)

Wall time: 2.69 s


In [133]:
play_piano_roll(sample_output, 0)

In [134]:
sample_song = np.concatenate([sample_piano_roll, sample_output.squeeze()], axis=1)

In [135]:
sample_song.shape

(128, 288)

In [136]:
%%time
play_piano_roll(sample_song, 0)

Wall time: 3.07 s


In [137]:
def generate_song_from_input(input_array, model, sequence_length, number_of_sequences):
    
    # input_array is 1 x M x 128 x sequence_length with elements in [0, 127]
    
    if input_array.shape[-1] != sequence_length or input_array.shape[-2] != 128:
        print('bad input shape: bad sequence_length or number of pitches')
        return
    
    if input_array.max() <= 1.5:
        input_array *= 127.
    
    input_array[input_array > 127] = 127
    input_array[input_array < 0] = 0
    input_array = input_array.round().astype('uint8')
           
    total_number_of_sequences = input_array.shape[0] + number_of_sequences
    
    piano_roll = np.zeros(shape=(total_number_of_sequences, 128, sequence_length))
    
    x_start = 0
    x_end = input_array.shape[0]
    piano_roll[x_start:x_end, :, :] = input_array
    
    y_start = x_end
    y_end = y_start + 1
    
    for n in range(number_of_sequences):
        
        x = piano_roll[x_start:x_end, :, :]          
        y = model.predict_piano_roll(x).squeeze()
        
        piano_roll[y_start:y_end, :, :] = y
        
        x_start += 1
        x_end += 1
        
        y_start += 1
        y_end += 1
        
    piano_roll_list = [piano_roll[s] for s in range(piano_roll.shape[0])]
    return np.concatenate(piano_roll_list, axis=-1)
    

In [140]:
sample_song = generate_song_from_input(sample_input[0][5].numpy(), melody_vae, SEQUENCE_LENGTH, 5)

In [141]:
print(sample_song.shape)
print(sample_song.max())
print(sample_song.min())

(128, 416)
127.0
0.0


In [142]:
play_piano_roll(sample_song)

## VAE with melody prediction model

In [186]:
model_input = Input(batch_shape=rnn_train_dataset.element_spec[0].shape, name='model_input')
model_input

<KerasTensor: shape=(32, 1, 128, 32) dtype=float32 (created by layer 'model_input')>

In [254]:
# Encoder definition
encoder_input = Reshape(target_shape=model_input.shape[1:]+(1,), name='encoder_input_reshape')(model_input)

encoder_conv_1 = Conv3D(filters=64, kernel_size=(1, 4, 4), strides=(1, 4, 4), 
                        activation='relu',name='encoder_conv2d_1')(encoder_input)

encoder_conv_2 = Conv3D(filters=128, kernel_size=(1, 4, 4), strides=(1, 4, 4), 
                        activation='relu', name='encoder_conv2d_2')(encoder_conv_1)

encoder_conv_3 = Conv3D(filters=256, kernel_size=(1, 8, 2), strides=(1, 8, 2), 
                        activation='linear', name='encoder_conv2d_3')(encoder_conv_2)

encoder_batch_norm = BatchNorm(name='encoder_batch_norm_3')(encoder_conv_3)
encoder_relu_3 = Relu(name='encoder_relu_3')(encoder_batch_norm)

encoder_flatten = Flatten(name='encoder_flatten')(encoder_relu_3)

encoder_dense_1 = Dense(128, activation='relu', name='encoder_dense_1')(encoder_flatten)

encoder_mu = Dense(LATENT_DIM, activation='linear', name='encoder_mu')(encoder_dense_1)
encoder_log_sigma = Dense(LATENT_DIM, activation='linear', name='variance')(encoder_dense_1)

encoder = Model(encoder_input, encoder_mu, name='encoder')

In [255]:
encoder.summary()

Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(32, 1, 128, 32, 1)]     0         
                                                                 
 encoder_conv2d_1 (Conv3D)   (32, 1, 32, 8, 64)        1088      
                                                                 
 encoder_conv2d_2 (Conv3D)   (32, 1, 8, 2, 128)        131200    
                                                                 
 encoder_conv2d_3 (Conv3D)   (32, 1, 1, 1, 256)        524544    
                                                                 
 encoder_batch_norm_3 (Batch  (32, 1, 1, 1, 256)       1024      
 Normalization)                                                  
                                                                 
 encoder_relu_3 (ReLU)       (32, 1, 1, 1, 256)        0         
                                                           

In [256]:
# Reparameterization and concatenation layers / specification
def sample_and_reparameterize(mu_logsigma):
    mu, log_sigma = mu_logsigma
    eps = K.random_normal(shape=(LATENT_DIM,))
    return mu + K.exp(log_sigma * .5) * eps

Z = Lambda(sample_and_reparameterize, output_shape=(LATENT_DIM,), 
           name='sample_and_reparameterize')([encoder_mu, encoder_log_sigma])

In [257]:
# Melody model definition

melody_dense_1_layer = Dense(128, activation='relu', name='melody_dense_1')
melody_dense_2_layer = Dense(128, activation='relu', name='melody_dense_2')
melody_dense_3_layer = Dense(128, activation='linear', name='melody_dense_3')

m1 = melody_dense_1_layer(Z)
m2 = melody_dense_2_layer(m1)
melody_output = melody_dense_3_layer(m2)

melody_input = Input(batch_shape=(BATCH_SIZE, LATENT_DIM), name='melody_input')
melody_dense_1 = melody_dense_1_layer(melody_input)
melody_dense_2 = melody_dense_2_layer(melody_dense_1)
melody_dense_3 = melody_dense_3_layer(melody_dense_2)

melody_model = Model(melody_input, melody_dense_3, name='melody_model')

In [258]:
melody_model.summary()

Model: "melody_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 melody_input (InputLayer)   [(32, 128)]               0         
                                                                 
 melody_dense_1 (Dense)      (32, 128)                 16512     
                                                                 
 melody_dense_2 (Dense)      (32, 128)                 16512     
                                                                 
 melody_dense_3 (Dense)      (32, 128)                 16512     
                                                                 
Total params: 49,536
Trainable params: 49,536
Non-trainable params: 0
_________________________________________________________________


In [259]:
# Decoder definition

decoder_dense_1_layer = Dense(128, activation='relu', name='decoder_dense_1')
decoder_dense_2_layer = Dense(128, activation='relu', name='decoder_dense_2')
decoder_reshape_1_layer = Reshape(target_shape=(1, 1, 128), name='reshape')

decoder_conv_1_layer = Conv2DTranspose(filters=128, kernel_size=(8, 2), strides=(8, 2), 
                                       activation='relu', padding='same', 
                                       name='conv2dtranspose_1')

decoder_conv_2_layer = Conv2DTranspose(filters=64, kernel_size=(4, 4), strides=(4, 4), 
                                       activation='relu', padding='same', 
                                       name='conv2dtranspose_2')

decoder_conv_3_layer = Conv2DTranspose(filters=1, kernel_size=(4, 4), strides=(4, 4), 
                                       activation='relu', padding='same', name='conv2dtranspose_3')

decoder_output_layer = Reshape(target_shape=(128, SEQUENCE_LENGTH), name='decoder_output')

d1 = decoder_dense_1_layer(melody_dense_3)
d2 = decoder_dense_2_layer(d1)
d3 = decoder_reshape_1_layer(d2)
d4 = decoder_conv_1_layer(d3)
d5 = decoder_conv_2_layer(d4)
d6 = decoder_conv_3_layer(d5)
melody_cvae_output = decoder_output_layer(d6)

decoder_input = Input(shape=(LATENT_DIM,), name='decoder_input')
decoder_dense_1 = decoder_dense_1_layer(decoder_input)
decoder_dense_2 = decoder_dense_2_layer(decoder_dense_1)
decoder_reshape_1 = decoder_reshape_1_layer(decoder_dense_2)
decoder_conv_1 = decoder_conv_1_layer(decoder_reshape_1)
decoder_conv_2 = decoder_conv_2_layer(decoder_conv_1)
decoder_conv_3 = decoder_conv_3_layer(decoder_conv_2)
decoder_output = decoder_output_layer(decoder_conv_3)

decoder = Model(decoder_input, decoder_output, name='decoder')

melody_cvae = Model(model_input, cvae_output, name='cvae')

In [229]:
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 128)]             0         
                                                                 
 decoder_dense_1 (Dense)     multiple                  16512     
                                                                 
 decoder_dense_2 (Dense)     multiple                  16512     
                                                                 
 reshape (Reshape)           multiple                  0         
                                                                 
 conv2dtranspose_1 (Conv2DTr  multiple                 262272    
 anspose)                                                        
                                                                 
 conv2dtranspose_2 (Conv2DTr  multiple                 131136    
 anspose)                                                  

In [231]:
decoder.output

<KerasTensor: shape=(None, 128, 32) dtype=float32 (created by layer 'decoder_output')>

In [230]:
melody_cvae.summary()

Model: "cvae"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 model_input (InputLayer)       [(32, 1, 128, 32)]   0           []                               
                                                                                                  
 encoder_input_reshape (Reshape  (32, 1, 128, 32, 1)  0          ['model_input[0][0]']            
 )                                                                                                
                                                                                                  
 encoder_conv2d_1 (Conv3D)      (32, 1, 32, 8, 64)   1088        ['encoder_input_reshape[0][0]']  
                                                                                                  
 encoder_conv2d_2 (Conv3D)      (32, 1, 8, 2, 128)   131200      ['encoder_conv2d_1[0][0]']    

In [24]:
class CVAE(tf.keras.Model):
    
    def __init__(self, latent_dim, sequence_length, batch_size):
        super(CVAE, self).__init__()
        
        self.latent_dim = latent_dim
        self.sequence_length = sequence_length
        self.batch_size = batch_size
        
        # Encoder definition
        self.X = Input(shape=(128, self.sequence_length), batch_size=self.batch_size, name='input')
        X_add_channel = Reshape(target_shape=(128, self.sequence_length, 1), name='encoder_input_reshape')(self.X)

        encoder_conv_1 = Conv2D(filters=64, kernel_size=(4, 4), strides=(4, 4), 
                                activation='relu', padding='valid', name='conv2d_1')(X_add_channel)

        #encoder_relu_1 = Relu(name='encoder_relu_1')(encoder_batch_norm_1)
        #encoder_batch_norm_1 = BatchNorm(name='encoder_batch_norm_1')(encoder_conv_1)

        encoder_conv_2 = Conv2D(filters=128, kernel_size=(4, 4), strides=(4, 4), 
                                activation='relu', padding='valid', name='conv2d_2')(encoder_conv_1)

        #encoder_batch_norm_2 = BatchNorm(name='encoder_batch_norm_2')(encoder_conv_2)
        #encoder_relu_2 = Relu(name='encoder_relu_2')(encoder_batch_norm_2)

        encoder_conv_3 = Conv2D(filters=256, kernel_size=(8, 2), strides=(8, 2), 
                                activation='relu', padding='valid', name='conv2d_3')(encoder_conv_2)

        #encoder_batch_norm_3 = BatchNorm(name='encoder_batch_norm_3')(encoder_conv_3)
        #encoder_relu_3 = Relu(name='encoder_relu_3')(encoder_batch_norm_3)

        encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)

        #encoder_dense_1 = Dense(256, activation='relu', name='encoder_dense_1')(encoder_flatten)
        #encoder_dense_2 = Dense(256, activation='relu', name='encoder_dense_2')(encoder_dense_1)
        #encoder_dense_3 = Dense(256, activation='relu', name='encoder_dense_3')(encoder_dense_2)

        self.encoder_mu = Dense(self.latent_dim, activation='linear', name='encoder_mu')(encoder_flatten)
        self.encoder_log_sigma = Dense(self.latent_dim, activation='linear', name='encoder_log_sigma')(encoder_flatten)

        self.encoder = Model(self.X, self.encoder_mu, name='encoder')
        
        # Sample and reparameterize layer
        Z = Lambda(self.sample_and_reparameterize, output_shape=(self.latent_dim,), batch_size=self.batch_size,
                   name='sample_and_reparameterize')([self.encoder_mu, self.encoder_log_sigma])
        
        # Decoder definition
        decoder_dense_1_layer = Dense(256, activation='relu', name='decoder_dense_1')
        #decoder_dense_2_layer = Dense(256, activation='relu', name='decoder_dense_2')
        #decoder_dense_3_layer = Dense(256, activation='relu', name='decoder_dense_3')
        decoder_reshape_1_layer = Reshape(target_shape=(1, 1, 256), name='reshape')

        decoder_conv_1_layer = Conv2DTranspose(filters=128, kernel_size=(8, 2), strides=(8, 2), 
                                               activation='relu', padding='valid', 
                                               name='conv2dtranspose_1')

        decoder_conv_2_layer = Conv2DTranspose(filters=64, kernel_size=(4, 4), strides=(4, 4), 
                                               activation='relu', padding='valid', 
                                               name='conv2dtranspose_2')

        decoder_conv_3_layer = Conv2DTranspose(filters=1, kernel_size=(4, 4), strides=(4, 4), 
                                               activation='sigmoid', padding='valid', 
                                               name='conv2dtranspose_3')

        decoder_output_layer = Reshape(target_shape=(128, self.sequence_length), name='decoder_output')

        d1 = decoder_dense_1_layer(Z)
        #d2 = decoder_dense_2_layer(d1)
        #d3 = decoder_dense_3_layer(d2)
        d4 = decoder_reshape_1_layer(d1)
        d5 = decoder_conv_1_layer(d4)
        d6 = decoder_conv_2_layer(d5)
        d7 = decoder_conv_3_layer(d6)
        self.cvae_output = decoder_output_layer(d7)

        decoder_input = Input(shape=(self.latent_dim,), batch_size=self.batch_size, name='decoder_input')
        decoder_dense_1 = decoder_dense_1_layer(decoder_input)
        #decoder_dense_2 = decoder_dense_2_layer(decoder_dense_1)
        #decoder_dense_3 = decoder_dense_3_layer(decoder_dense_2)
        decoder_reshape_1 = decoder_reshape_1_layer(decoder_dense_1)
        decoder_conv_1 = decoder_conv_1_layer(decoder_reshape_1)
        decoder_conv_2 = decoder_conv_2_layer(decoder_conv_1)
        decoder_conv_3 = decoder_conv_3_layer(decoder_conv_2)
        decoder_output = decoder_output_layer(decoder_conv_3)

        self.decoder = Model(decoder_input, decoder_output, name='decoder')

        self.cvae = Model(self.X, self.cvae_output, name='cvae')
                  
        self.cvae.add_loss(self.vae_loss(self.X, self.cvae_output, self.encoder_mu, self.encoder_log_sigma))
        self.cvae.add_metric(self.reconstruction_loss(self.X, self.cvae_output), name='reconstruction_loss')
        self.cvae.add_metric(self.kl_divergence(self.encoder_mu, self.encoder_log_sigma), name='kl_divergence')
        
    def compile(self, optimizer):
        super(CVAE, self).compile()

        self.encoder.compile(optimizer, loss=None)            
        self.decoder.compile(optimizer, loss=None)
        self.cvae.compile(optimizer, loss=None)            

    def sample_and_reparameterize(self, mu_log_sigma):

        mu, log_sigma = mu_log_sigma

        eps = K.random_normal(shape=(self.latent_dim,))
        return mu + K.exp(log_sigma * .5) * eps

    def reconstruction_loss(self, x_true, x_pred):

        cross_entropy = K.binary_crossentropy(target=x_true, output=x_pred)    
        return K.sum(cross_entropy, axis=[1, 2])

    def kl_divergence(self, emu, els):

        kl_2 = K.exp(els) + K.square(emu) - 1. - els
        kl = .5 * K.sum(kl_2, axis=-1)
        return kl

    def vae_loss(self, x_true, x_pred, emu, els):

        recon_loss = self.reconstruction_loss(x_true, x_pred)
        kl_loss = self.kl_divergence(emu, els)

        return recon_loss + kl_loss
    
    def call(self, inputs, is_training=False):
        
        inputs_is_list = isinstance(inputs, list)
        
        if inputs_is_list and is_training:
            return [self.train_step(x_y) for x_y in inputs]
        
        elif inputs_is_list and not is_training:
            return [self.test_step(x_y) for x_y in inputs]
        
        elif not inputs_is_list and is_training:
            return self.train_step(inputs)
        
        elif not inputs_is_list and not is_training:
            return self.test_step(inputs)

In [25]:
cvae = CVAE(LATENT_DIM, SEQUENCE_LENGTH, BATCH_SIZE)
cvae.compile(tf.keras.optimizers.Adam(1e-3))

In [None]:
cvae.fit(x=cvae_train_dataset, shuffle=False,
         batch_size=BATCH_SIZE, steps_per_epoch=128, 
         epochs=1,
         validation_data=cvae_test_dataset, validation_steps=16,
         #callbacks=vae_clbks
        )