# Training a Variational Autoencoder to Generate Classical Music

In [1]:
import os

import numpy as np

import scipy.sparse
from scipy.sparse import coo_matrix, save_npz, load_npz

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import Model, Input, Sequential
from tensorflow.keras.layers import InputLayer, Flatten, Reshape
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.layers import ConvLSTM2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM, GRU, Bidirectional
from tensorflow.keras.layers import BatchNormalization as BatchNorm
from tensorflow.keras.layers import ReLU as Relu
from tensorflow.keras.layers import Lambda, Concatenate

import math

import datetime
from IPython import display

import time
import re
from datetime import datetime

from matplotlib import pyplot as plt

import absl.logging

from google.colab import drive

In [2]:
!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -qU pyfluidsynth pretty_midi
!pip install music21
!pip install pypianoroll

import pretty_midi
import fluidsynth
import pypianoroll

Selecting previously unselected package fluid-soundfont-gm.
(Reading database ... 155455 files and directories currently installed.)
Preparing to unpack .../fluid-soundfont-gm_3.1-5.1_all.deb ...
Unpacking fluid-soundfont-gm (3.1-5.1) ...
Selecting previously unselected package libfluidsynth1:amd64.
Preparing to unpack .../libfluidsynth1_1.1.9-1_amd64.deb ...
Unpacking libfluidsynth1:amd64 (1.1.9-1) ...
Setting up fluid-soundfont-gm (3.1-5.1) ...
Setting up libfluidsynth1:amd64 (1.1.9-1) ...
Processing triggers for libc-bin (2.27-3ubuntu1.3) ...
/sbin/ldconfig.real: /usr/local/lib/python3.7/dist-packages/ideep4py/lib/libmkldnn.so.0 is not a symbolic link

[K     |████████████████████████████████| 5.6 MB 7.6 MB/s 
[K     |████████████████████████████████| 51 kB 4.5 MB/s 
[?25h  Building wheel for pretty-midi (setup.py) ... [?25l[?25hdone
Collecting pypianoroll
  Downloading pypianoroll-1.0.4-py3-none-any.whl (26 kB)
Installing collected packages: pypianoroll
Successfully installed 

In [3]:
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
tf.config.list_physical_devices('GPU')

[]

## Preparing data

This notebook uses MIDI files of piano music from the Maestro dataset. Prior to the execution of this notebook, I converted these MIDI files into piano rolls which were then saved as COO scipy sparse matrices. A subset of these piano rolls are then loaded into memory as dense matrices.

In [4]:
# Project root directory
PROJECT_ROOT_DIR = os.path.join('/content', 'drive', 'MyDrive', 'colab', 'music_vae')

# Models root directory
MODELS_ROOT_DIR = os.path.join(PROJECT_ROOT_DIR, 'models')
HIERARCHAL_VAE_DIR = os.path.join(MODELS_ROOT_DIR, 'hierarchal_vae')

if not os.path.exists(HIERARCHAL_VAE_DIR):
    os.mkdir(HIERARCHAL_VAE_DIR)

# Data directory
DATA_ROOT_DIR = os.path.join(PROJECT_ROOT_DIR, 'maestro_v3.0.0_sparse', 'maestro_v3.0.0_sparse')
TRAIN_DIR = os.path.join(DATA_ROOT_DIR, 'train')
VALID_DIR = os.path.join(DATA_ROOT_DIR, 'validation')
TEST_DIR = os.path.join(DATA_ROOT_DIR, 'test')

# Output directories
TEMP_OUTPUT_PATH = os.path.join(PROJECT_ROOT_DIR, 'temp.mid') 

In [5]:
_SAMPLING_RATE = 16000 # <-- 
FS = 1 / 100 # <-- Each timestep in a midi converted from piano roll is 1 / 100

In [6]:
TRAINING_PIANO_ROLL_PATHS = sorted([os.path.join(TRAIN_DIR, f) for f in os.listdir(TRAIN_DIR)])
VALIDATION_PIANO_ROLL_PATHS = sorted([os.path.join(VALID_DIR, f) for f in os.listdir(VALID_DIR)])
TESTING_PIANO_ROLL_PATHS = sorted([os.path.join(TEST_DIR, f) for f in os.listdir(TEST_DIR)])

In [7]:
LATENT_DIM = 32 # <-- Number of dimensions to encode piano rolls into
BATCH_SIZE = 32 # <-- Batch size for training

In [8]:
# Utility functions for conversions loading and processing data
def list_of_scipy_sparse_to_list_sparse_tensor(list_scipy_sparse):
    return [scipy_sparse_to_sparse_tensor(s) for s in list_scipy_sparse]

def piano_roll_to_sparse_tensor(piano_roll):
    
    scipy_sparse = coo_matrix(piano_roll)
    return scipy_sparse_to_sparse_tensor(scipy_sparse)

def piano_roll_path_to_sparse_tensor(piano_roll_path):
    
    s = load_npz(piano_roll_path)   
    return scipy_sparse_to_sparse_tensor(s)

def scipy_sparse_to_sparse_tensor(scipy_sparse):
    
    indices = np.mat([scipy_sparse.row, scipy_sparse.col]).transpose()
    return tf.cast(tf.sparse.SparseTensor(indices, 
                                          scipy_sparse.data, 
                                          scipy_sparse.shape),
                    dtype=tf.float32
                   )

def piano_roll_path_to_dense_tensor(piano_roll_path):
    sparse_tensor = piano_roll_path_to_sparse_tensor(piano_roll_path)    
    return tf.sparse.to_dense(sparse_tensor)

def process_array(unprocessed_array, sequence_length, pitch_bounds, num_bins, 
                  add_beginning_flag=True, min_velocity_to_quantize=0, 
                  constant_velocity=False, constant_velocity_threshold=64, default_velocity=77,
                 ):
    
    array = unprocessed_array.copy()
    
    assert 0 <= array.min()
    assert array.max() <= 127
     
    if array.max() <= 1:
        array *= 127.
        
    pitch_lower_bound, pitch_upper_bound = pitch_bounds
    
    assert array.shape[0] == 128, 'processing tensor without 128 pitches'
    array = array[pitch_lower_bound : pitch_upper_bound+int(add_beginning_flag), :]
    
    if not constant_velocity:
        
        if num_bins != 0:
            
            array[array < min_velocity_to_quantize] = 0
            q = num_bins / (127 - min_velocity_to_quantize)
            array = (array * q).round() / q
                    
    else:
        array[array < constant_velocity_threshold] = 0
        array[constant_velocity_threshold <= array] = default_velocity
        
    array /= 127.
        
    if add_beginning_flag:
        flags = [1] * sequence_length + [0] * (array.shape[-1] - sequence_length)
    else:
        flags = [0] * array.shape[-1]
        
    return np.vstack([array, flags])        
        

def load_piano_rolls(piano_roll_paths, is_tensor_data, process_array_kwargs={}):
            
    if is_tensor_data:
        
        arrays = [process_array( load_npz(path).A, **process_array_kwargs )
                  for path in piano_roll_paths]
        
        return np.hstack(arrays)
        
    else:
        
        tensors = [piano_roll_path_to_sparse_tensor(path) for path in piano_roll_paths]
        return tensors      
    

In [9]:
SEQUENCE_LENGTH = 32 # <-- length of input / target sequences in 1/100 seconds

QUANTIZE_NUM_BINS = 0 # <-- Number of bins for quantization of velocities (setting to 0 or 128 does not quantize)
MIN_VELOCITY_TO_QUANTIZE = 20 # <-- All velocities less than this are not quantized, are clipped to 0
MAX_VELOCITY_TO_QUANTIZE = 115 # <-- All velocities greater than this are not quantized, are clipped to this value

PIANO_PITCH_BOUNDS = (20, 109) # Bounds for the MIDI  representation of the range of possible pitches a piano can play
# I added 2 pitches so there are 90 dimensions, which makes for better conv kernels / strides
# We can get rid of pitches outside this range since they are always 0 in the piano rolls
NUM_PIANO_PITCHES = PIANO_PITCH_BOUNDS[1] - PIANO_PITCH_BOUNDS[0] + 1 

ADD_BEGINNING_FLAG = True # Whether to each piano roll timestep as being within SEQUENCE_LENGTH of the song beginning

CONSTANT_VELOCITY = False # <-- Whether to binarize velocities
CONSTANT_VELOCITY_THRESHOLD = 20 # <-- Threshold for velocity binarization
DEFAULT_VELOCITY = 77 # <-- Value to set nonzero velocities to when binarizing

# kwargs for processing piano rolls
process_array_kwargs = {'sequence_length': SEQUENCE_LENGTH,
                        'num_bins': QUANTIZE_NUM_BINS,
                        'min_velocity_to_quantize': MIN_VELOCITY_TO_QUANTIZE,
                        'pitch_bounds': PIANO_PITCH_BOUNDS,
                        'add_beginning_flag': True,                         
                        'constant_velocity': False,
                        'constant_velocity_threshold': 35,
                        'default_velocity': 77
                       }

SPARSE_MAX_SONGS = 30 # <-- Max number of tensors that I'll load into memory in dense format
NUM_SONGS_TO_TRAIN = 20 # <-- Number of songs to load into memory for training
TRAINING_TENSOR_DATA = NUM_SONGS_TO_TRAIN <= SPARSE_MAX_SONGS

GET_VALIDATION_DATA = True
if GET_VALIDATION_DATA:
    NUM_SONGS_TO_VALID = 5 # <-- Number of songs to load into memory for validation
    VALIDATION_TENSOR_DATA = NUM_SONGS_TO_VALID <= SPARSE_MAX_SONGS
    
GET_TESTING_DATA = False
if GET_TESTING_DATA:
    NUM_SONGS_TO_TEST = 1
    VALIDATION_TENSOR_DATA = NUM_SONGS_TO_VALID <= SPARSE_MAX_SONGS

In [10]:
np.random.seed(36)
train_piano_roll_paths = np.random.choice(TRAINING_PIANO_ROLL_PATHS, size=NUM_SONGS_TO_TRAIN, replace=False)

%time PIANO_ROLLS_TRAIN = load_piano_rolls(train_piano_roll_paths, TRAINING_TENSOR_DATA, process_array_kwargs)
    
if GET_VALIDATION_DATA:
    valid_piano_roll_paths = np.random.choice(VALIDATION_PIANO_ROLL_PATHS, size=NUM_SONGS_TO_VALID, replace=False)
    %time PIANO_ROLLS_VALID = load_piano_rolls(valid_piano_roll_paths, VALIDATION_TENSOR_DATA, process_array_kwargs)
    
if GET_TESTING_DATA:
    test_piano_roll_paths = np.random.choice(TESTING_PIANO_ROLL_PATHS, size=NUM_SONGS_TO_TEST, replace=False)
    %time PIANO_ROLLS_TEST = load_piano_rolls(test_piano_roll_paths, TESTING_TENSOR_DATA, process_array_kwargs)

CPU times: user 2.59 s, sys: 1.43 s, total: 4.01 s
Wall time: 14.7 s
CPU times: user 282 ms, sys: 32.1 ms, total: 314 ms
Wall time: 1.94 s


In [11]:
if TRAINING_TENSOR_DATA:
    print(f'training piano rolls tensor shape: {PIANO_ROLLS_TRAIN.shape}')
else:
    print(f'training piano rolls sample tensor shape: {PIANO_ROLLS_TRAIN[0].shape[-1]}')
    
if GET_VALIDATION_DATA:
    if VALIDATION_TENSOR_DATA:
        print(f'validation piano rolls tensor shape: {PIANO_ROLLS_VALID.shape}')
    else:
        print(f'validation piano rolls sample tensor shape: {PIANO_ROLLS_VALID[0].shape[-1]}')
        
if GET_TESTING_DATA:
    if TESTING_TENSOR_DATA:
        print(f'testing piano rolls tensor shape: {PIANO_ROLLS_TEST.shape}')
    else:
        print(f'testing piano rolls sample tensor shape: {PIANO_ROLLS_TEST[0].shape[-1]}')

training piano rolls tensor shape: (91, 1407330)
validation piano rolls tensor shape: (91, 136488)


In [12]:
# To make sure each piano roll in training is sufficiently loud (e.g., no significant periods of silence)
# we will calculate the the distribution of mean velocities for each SEQUENCE_LENGTH interval

def get_velocity_quantiles(tensor, quantiles, sequence_length=SEQUENCE_LENGTH):
        
    idx = np.arange(tensor.shape[-1], step=sequence_length)
    subarrays = np.split(tensor, indices_or_sections=idx, axis=-1)
    subarrays = [s for s in subarrays if s.shape[-1] == sequence_length]
    mean_velocities = np.array([np.mean(seq) for seq in subarrays])
    return np.quantile(mean_velocities, quantiles)
    
quantiles = [.01, .05, .1, .15, .2]
velocity_quantiles = get_velocity_quantiles(PIANO_ROLLS_TRAIN, quantiles)

for q, vq in zip(quantiles, velocity_quantiles):
    a = (1 - q) * 100
    q_str = str(a)
    if a % 10 == 1:
        q_str += 'st'
    elif a % 10 == 2:
        q_str += 'nd'
    elif a % 10 == 3:
        q_str += 'rd'
    else:
        q_str += 'th'
        
    print(f'{q_str} percentile quietness: {round(vq, 6)} --> {round(vq*127, 3)}')
    
QUIETNESS_THRESHOLD = velocity_quantiles[2]

99.0th percentile quietness: 0.0 --> 0.0
95.0th percentile quietness: 0.002698 --> 0.343
90.0th percentile quietness: 0.004721 --> 0.6
85.0th percentile quietness: 0.0065 --> 0.825
80.0th percentile quietness: 0.00822 --> 1.044


In [13]:
QUIETNESS_MASK = [
    PIANO_ROLLS_TRAIN[:-1, t:t+SEQUENCE_LENGTH].mean() > QUIETNESS_THRESHOLD 
    for t in range(PIANO_ROLLS_TRAIN.shape[-1] - SEQUENCE_LENGTH) 
]

## Piano Roll helper functions

In [14]:
def piano_roll_to_pretty_midi(piano_roll_array, instrument_program_number, fs=100):
    
    assert isinstance(piano_roll_array, tf.Tensor) or isinstance(piano_roll_array, np.ndarray), 'bad input type'
    
    if isinstance(piano_roll_array, tf.Tensor):
        piano_roll = piano_roll_array.numpy().copy().squeeze()
    else:
        piano_roll = piano_roll_array.copy().squeeze()
        
    piano_roll = piano_roll.clip(0, 127)
    assert 0 <= piano_roll.min() and piano_roll.max() <= 127, 'piano roll has velocity not in [0, 127]'
    assert len(piano_roll.shape) == 2, f'expected ndim=2 but piano roll has ndim={len(piano_roll.shape)}'
    
    global NUM_PIANO_PITCHES
    if piano_roll.shape[0] != 128:
        
        if piano_roll.shape[0] == NUM_PIANO_PITCHES + 1:
            piano_roll = piano_roll[:-1, :]
            
        pad_num_pitches = (128 - NUM_PIANO_PITCHES) // 2
        top_pad = np.zeros( shape=( pad_num_pitches, piano_roll.shape[-1] ) )
        bot_pad = top_pad.copy()
        
        piano_roll = np.concatenate([bot_pad, piano_roll.copy(), top_pad], axis=0) 
                
    assert piano_roll.shape[0] == 128, f'have {piano_roll.shape[0]} != 128 pitches after padding'
    
    if piano_roll.max() <= 1:
        piano_roll *= 127.

    piano_roll[piano_roll < 0] = 0
    piano_roll[piano_roll > 127] = 127
    piano_roll = piano_roll.round().astype('uint8')
    
    notes, frames = piano_roll.shape
    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=instrument_program_number)

    # pad 1 column of zeros so we can acknowledge inital and ending events
    piano_roll = np.pad(piano_roll, [(0, 0), (1, 1)], 'constant')

    # use changes in velocities to find note on / note off events
    velocity_changes = np.nonzero(np.diff(piano_roll).T)

    # keep track on velocities and note on times
    prev_velocities = np.zeros(notes, dtype=int)
    note_on_time = np.zeros(notes)

    for time, note in zip(*velocity_changes):
        # use time + 1 because of padding above
        velocity = piano_roll[note, time + 1]
        time = time / fs
        if velocity > 0:
            if prev_velocities[note] == 0:
                note_on_time[note] = time
                prev_velocities[note] = velocity
        else:
            pm_note = pretty_midi.Note(
                velocity=prev_velocities[note],
                pitch=note,
                start=note_on_time[note],
                end=time)
            instrument.notes.append(pm_note)
            prev_velocities[note] = 0
    pm.instruments.append(instrument)
    return pm    

def play_piano_roll(piano_roll_array, buffer_time=0, instrument_program_number=0, temp_path=TEMP_OUTPUT_PATH):
    
    midi = piano_roll_to_pretty_midi(piano_roll_array, instrument_program_number)
    midi.write(temp_path)
    
    sleep_time = piano_roll_array.shape[-1] / 100 + buffer_time
    
    pygame.mixer.init()
    pygame.mixer.music.load(temp_path)
    
    pygame.mixer.music.play()
    time.sleep(sleep_time)
    
    pygame.mixer.music.stop()

def display_audio(pm, seconds=30):
    
    global _SAMPLING_RATE
    
    waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
    
    # Take a sample of the generated waveform to mitigate kernel resets
    waveform_short = waveform[:seconds*_SAMPLING_RATE]
    
    return display.Audio(waveform_short, rate=_SAMPLING_RATE)
    
def display_piano_roll_audio(piano_roll, instrument_program_number=0):
    
    midi = piano_roll_to_pretty_midi(piano_roll, instrument_program_number)
    return display_audio(midi)
    
    
def play_samples_from_batch(batch_array, number_of_samples=5, instrument_program_number=0,
                            buffer=0, shuffle=True, temp_path=TEMP_OUTPUT_PATH):
    
    assert isinstance(batch_array, tf.Tensor) or isinstance(batch_array, np.ndarray), 'invalid batch array type'
    assert len(batch_array.shape) in (3, 4), f'batch_array ndim={len(batch_array.shape)} not in (3, 4)'
    
    if isinstance(batch_array, tf.Tensor):
        batch = batch_array.numpy().copy()
    else:
        batch = batch_array.copy()
        
    # batch_array has shape batch_size x number_of_melodies x NUM_PIANO_PITCHES(+1) x time_steps
    #                    or batch_size x NUM_PIANO_PITCHES(+1) x time_steps
    
    if len(batch.shape) == 3:
        batch = np.expand_dims(batch, axis=1)
        
    if batch.shape[-2] == NUM_PIANO_PITCHES + 1:
        batch = batch[:, :, :-1, :]
        
    if shuffle:
        steps = np.random.choice(np.arange(batch.shape[0]), size=number_of_samples, replace=False)
        steps = sorted(steps)
        
    else: 
        steps = range(number_of_samples)
                
    for s in steps:
        
        display.clear_output(wait=False)
        print(f'sample # {s}')
        
        piano_roll = batch[s]
        piano_roll = [piano_roll[m] for m in range(piano_roll.shape[0])]
        piano_roll = np.concatenate(piano_roll, -1)
        
        play_piano_roll(piano_roll, buffer, instrument_program_number, temp_path)
    
def play_inputs_and_outputs(inputs_array, outputs_array, number_of_samples, instrument_program_number=0,
                            buffer=0., shuffle=True, midi_path=TEMP_OUTPUT_PATH):
    
    assert isinstance(inputs_array, tf.Tensor) or isinstance(inputs_array, np.ndarray), 'bad input type'
    assert isinstance(outputs_array, tf.Tensor) or isinstance(outputs_array, np.ndarray), 'bad output type'
    
    if isinstance(inputs_array, tf.Tensor):
        inputs = inputs_array.numpy().copy()
    else:
        inputs = inputs_array.copy()
           
    if isinstance(outputs_array, tf.Tensor):
        outputs = outputs_array.numpy().copy()
    else:
        outputs = outputs_array.copy()
    
    # Inputs is ndarray shaped batch_size x number_of_melodies x NUM_PIANO_PITCHES x sequence_length
    # Outputs is ndarray batch_size x NUM_PIANO_PITCHES x sequence_length   
    
    if len(inputs.shape) == 3:
        inputs = np.expand_dims(inputs, 1)
        
    if shuffle:
        possible_sample_indices = range(outputs.shape[0])
        steps = sorted(np.random.choice(possible_sample_indices, size=number_of_samples, replace=False))
        
    else: 
        steps = range(number_of_samples)
    
    for s in steps:
        
        display.clear_output(wait=False)
        
        sample_input = inputs[s]
        sample_input = [sample_input[m] for m in range(sample_input.shape[0])]
        sample_input = np.concatenate(sample_input, -1)       
        
        sample_output = outputs[s]
        
        print(f'sample #{s}')
        
        print('input')
        play_piano_roll(sample_input, buffer, instrument_program_number, temp_path=midi_path)
        
        print('output')
        play_piano_roll(sample_output, buffer, instrument_program_number, temp_path=midi_path)

def concatenate_piano_rolls(piano_rolls_raw, zero_buf):
    
    if isinstance(piano_rolls_raw, tf.Tensor):
        piano_rolls = piano_rolls_raw.numpy().copy()
            
    elif isinstance(piano_rolls_raw, np.ndarray):
        piano_rolls = piano_rolls_raw.copy()
        
    elif isinstance(piano_rolls_raw, list) and isinstance(piano_rolls_raw[0], tf.Tensor):
        piano_rolls = [pr.numpy().squeeze().copy() for pr in piano_rolls_raw]
        
    elif isinstance(piano_rolls_raw, list) and isinstance(piano_rolls_raw[0], np.ndarray):
        piano_rolls = [pr.squeeze().copy() for pr in piano_rolls_raw]
        
    else:
        assert 1 == 0, 'bad input type'
        return
        
    if not isinstance(piano_rolls, list):
        
        assert len(piano_rolls.shape) == 3, 'piano roll does not have dimension for different sequences'
        piano_rolls = [piano_rolls[p] for p in range(piano_rolls.shape[0])]
        
    if len(piano_rolls) == 1:
        return piano_rolls[0]
        
    assert all(len(pr.shape) == 2 for pr in piano_rolls), 'a piano roll does not have 2 dimensions'
    assert all(piano_rolls[0].shape[0] == pr.shape[0] for pr in piano_rolls), 'piano rolls have differing numbers of notes'
                
    num_notes = piano_rolls[0].shape[0]
    
    if zero_buf:
        return np.concatenate([piano_rolls[p//2] if p % 2 == 0 else np.zeros(shape=(num_notes, 1))
                               for p in range(2*len(piano_rolls)-1)], axis=-1)    
    
    else:
        return np.concatenate([piano_rolls[p] for p in range(len(piano_rolls))], axis=-1)  
    
def write_piano_roll_to_storage(piano_roll_raw, output_dir, file_name, instrument_program_number=0):
    
    assert isinstance(piano_roll_raw, tf.Tensor) or isinstance(piano_roll_raw, np.ndarray)
    
    if isinstance(piano_roll_raw, tf.Tensor):
        pr = piano_roll_raw.numpy().copy()
    else:
        pr = piano_roll_raw.copy()
        
    assert len(pr.shape) == 2, f'piano roll has ndim={len(pr.shape)} but expects ndim=2'
        
    global NUM_PIANO_PITCHES
    if pr.shape[0] == NUM_PIANO_PITCHES:
            
        pad_num_pitches = (128 - NUM_PIANO_PITCHES) // 2
        top_pad = np.zeros( shape=( pad_num_pitches, pr.shape[-1] ) )
        bot_pad = top_pad.copy()
        
        pr = np.concatenate([bot_pad, pr.copy(), top_pad], axis=0) 
                
    assert pr.shape[0] == NUM_PIANO_PITCHES, f'have {pr.shape[0]} != {NUM_PIANO_PITCHES} pitches after padding'
        
    if pr.max() <= 5:
        pr *= 127.

    pr[pr > 127] = 127
    pr = pr.round().astype('uint8')
            
    midi = piano_roll_to_pretty_midi(pr, instrument_program_number)
    
    file_path = output_dir + file_name
    midi.write(file_path)

def write_piano_roll_batch_to_storage(piano_roll_batch_raw, output_dir, instrument_program_number=0):
    
    assert isinstance(piano_roll_batch_raw, tf.Tensor) or isinstance(piano_roll_batch_raw, np.ndarray)
    
    if isinstance(piano_roll_batch_raw, tf.Tensor):
        piano_roll_batch = piano_roll_batch_raw.numpy().copy()
        
    else:
        piano_roll_batch = piano_roll_batch_raw.copy()
        
    assert len(piano_roll_batch.shape) == 3, 'incompatible shape'
    
    for p in range(piano_roll_batch.shape[0]):
        
        pr = piano_roll_batch[p]
        
        file_name = f'piano_roll_{p+1}.mid'
        write_piano_roll_to_storage(pr, output_dir, file_name, instrument_program_number)
    
def generate_song(model, sequence_length, number_of_sequences, zero_buf):
    
    global NUM_PIANO_PITCHES
    k = model.latent_dim
    
    eps = np.random.normal(size=(1, k))
    seed = model.decoder(eps).numpy()
        
    piano_roll = np.zeros(shape=(number_of_sequences, NUM_PIANO_PITCHES, sequence_length))
    piano_roll[0] = seed
    
    for s in range(number_of_sequences - 1):
        
        input_piano_roll = np.expand_dims(piano_roll[s], 0)
        input_piano_roll = np.expand_dims(input_piano_roll, 0)
        
        piano_roll[s+1] = model.predict_piano_roll(input_piano_roll)
        
    piano_roll_list = [piano_roll[p] for p in range(piano_roll.shape[0])]
    return np.concatenate(piano_roll_list, -1)

def generate_song_from_input(input_array, model, sequence_length, number_of_sequences, zero_buf):
    
    if isinstance(input_array, tf.Tensor):
        input_piano_roll = input_array.numpy().copy()
    else:
        input_piano_roll = input_array.copy()
        
    global NUM_PIANO_PITCHES
            
    assert input_piano_roll.shape[-1] == sequence_length, 'inputs sequence length does not match sequence length'
    assert input_piano_roll.shape[-2] == NUM_PIANO_PITCHES, 'input piano roll has incorrect number of pitches'
        
    if len(input_piano_roll.shape) == 2:
        input_piano_roll = np.expand_dims(input_piano_roll, 0)
        
    # input_piano_roll <--- [num_melodies, PIANO_NUM_PITCHES, sequence_length]
    
    input_piano_roll[input_piano_roll > 1] = 1
    input_piano_roll[input_piano_roll < 0] = 0
    
    flat_input_piano_roll = [input_piano_roll[p] for p in range(input_piano_roll.shape[0])]
    flat_input_piano_roll = np.concatenate(flat_input_piano_roll, axis=-1)
    
    # flat_input_piano_roll <--- [NUM_PIANO_PITCHES, num_melodies x sequence_length]
           
    total_number_of_sequences = input_piano_roll.shape[0] + number_of_sequences
    
    piano_roll = np.zeros(shape=(total_number_of_sequences, NUM_PIANO_PITCHES, sequence_length))
    
    x_start_index = 0
    y_index = input_piano_roll.shape[0]
    
    piano_roll[x_start_index:y_index, :, :] = input_piano_roll
    
    piano_roll_list = [flat_input_piano_roll]
        
    for n in range(number_of_sequences):
                
        x = piano_roll[x_start_index : y_index, :, :]
        x = np.expand_dims(x, 0)
        
        y_pred = model.predict_piano_roll(x)
        
        piano_roll[y_index, :, :] = y_pred
        piano_roll_list.append(y_pred)
        
        x_start_index += 1
        y_index += 1
    
    return concatenate_piano_rolls(piano_roll_list, zero_buf)

def generate_songs_from_input_batch(input_batch, model, sequence_length, number_of_sequences, zero_buf):
    
    # input_batch <--- [b, num_melodies, NUM_PIANO_PITCHES, sequence_length]
    
    if isinstance(input_array, tf.Tensor):
        input_piano_rolls = input_batch.numpy().copy()
    else:
        input_piano_rolls = input_batch.copy()
        
    B = input_piano_rolls.shape[0]
            
    return [generate_song_from_input(input_piano_rolls[b]) for b in range(B)]

## Training VAE to Produce Single Notes

### TF Datasets Of Single Sequence

In [None]:
class NoteGenerator:    
    
    # Generator which yields randomly selected piano roll of length sequence_length
    # with mean velocity >= quietness_threshold
    
    def __init__(self, data, sequence_length, batch_size, seed=None, quietness_mask=None):
        
        self.sequence_length = sequence_length 
        self.seed = seed
        self.batch_size = batch_size
        
        self.data_is_dense = not isinstance(data, list)
                                
        # Data is single dense tensor
        if self.data_is_dense: 
            
            self.data = data.copy()
            quietness_mask = quietness_mask if quietness_mask is not None else [True] * self.data.shape[-1]
            
            def is_t_usable(t):
                c1 = self.data[-1, t] == 0
                c2 = quietness_mask[t]
                return c1 and c2 and self.data[-1, t+self.sequence_length] == 0
            
            self.usable_indices = np.array([t
                                            for t in range(self.data.shape[-1] - self.sequence_length)
                                            if is_t_usable(t)
                                           ])
            self.n = len(self.usable_indices)
            
            self.data = self.data[:-1, :]
            self.p = self.data.shape[0]
            
            self.total_obs_per_epoch = self.n - (self.n % self.batch_size) - 1
            
            self.batch_beg_id = 0
            self.batch_end_id = self.batch_size
            
        # Data is list of sparse tensors
        else:
            self.data = data
            self.num_tensors = len(self.tensors)
            self.num_piano_pitches = self.data[0].shape[0]
                
    def __iter__(self):
        
        if self.seed is not None:
            np.random.seed(self.seed)
        
        while True:
            
            if self.data_is_dense:
                
                if self.batch_beg_id == 0:
                    np.random.shuffle(self.usable_indices)
                    
                batch_obs_beg_idx = self.usable_indices[self.batch_beg_id:self.batch_end_id]
                batch_obs_end_idx = batch_obs_beg_idx + self.sequence_length
                
                batch_data = np.stack([self.data[:, obs_beg_id:obs_end_id] 
                                       for obs_beg_id, obs_end_id in zip(batch_obs_beg_idx, batch_obs_end_idx)])
                    
                self.batch_beg_id += self.batch_size
                self.batch_end_id += self.batch_size
                
                if self.batch_end_id >= self.total_obs_per_epoch:
                    self.batch_beg_id = 0
                    self.batch_end_id = self.batch_size
                    
                yield batch_data
                
            else:
                
                sparse_tensor = self.sparse_tensors[np.random.randint(0, self.num_tensors)]
                last_start = (sparse_tensor.shape[1] - 2 * self.sequence_length - 3)

                note_start = np.random.randint(0, last_start)
                note_start -= note_start % 32

                note = tf.sparse.slice(sparse_tensor,
                                       start=[0, note_start],
                                       size=[128, self.sequence_length]
                                      )
                
                note = tf.sparse.to_dense(note)
                
                if note.shape != (self.num_piano_pitches, self.sequence_length):
                    continue

                yield note
            
    def __call__(self):
        return self.__iter__()
        

In [None]:
class SamplerGenerator:
    
    def __init__(self, generators, sample_size=3000, seed=None):
        
        self.generators = [iter(gen) for gen in generators]
        self.num_generators = len(self.generators)
        
        self.sample_size = sample_size
        self.seed = seed
        
    def __iter__(self):
        
        if self.seed is not None:
            np.random.seed(seed)
        
        while True:
            
            g_idx = np.random.randint(0, self.num_generators, size=self.sample_size) 
            
            for g in g_idx:
                yield next(self.generators[g])
                
    def __call__(self):
        return self.__iter__()

In [None]:
SPARSE_DATA_SIZE_OF_GENERATORS = 50
NUMBER_OF_SUB_GENERATORS = 25

NOTE_GENERATOR_OUTPUT_SIGNATURE = tf.TensorSpec(shape=(BATCH_SIZE, NUM_PIANO_PITCHES, SEQUENCE_LENGTH))
NOTE_TRAIN_GENERATOR = NoteGenerator(PIANO_ROLLS_TRAIN, SEQUENCE_LENGTH, BATCH_SIZE, 11, QUIETNESS_MASK)
note_train_ds_card = NOTE_TRAIN_GENERATOR.total_obs_per_epoch // NOTE_TRAIN_GENERATOR.batch_size - 1

note_train_dataset = (tf.data.Dataset
                      .from_generator(NOTE_TRAIN_GENERATOR, output_signature=NOTE_GENERATOR_OUTPUT_SIGNATURE)
                      #.cache()
                      .prefetch(tf.data.AUTOTUNE)
                      .apply(tf.data.experimental.assert_cardinality(note_train_ds_card))
                     )

'''# Data is a single tensor
if TRAINING_TENSOR_DATA:
    
    note_train_sub_generators = [
        NoteGenerator(PIANO_ROLLS_TRAIN, SEQUENCE_LENGTH, BATCH_SIZE, 11*g, QUIETNESS_MASK)
        for g in range(NUMBER_OF_SUB_GENERATORS)
    ]
    
    note_train_ds_card = note_train_sub_generators[0].total_obs_per_epoch // note_train_sub_generators[0].batch_size - 1
    
    note_train_generator = SamplerGenerator(note_train_sub_generators, 7)
    note_train_dataset = (tf.data.Dataset
                          .from_generator(note_train_generator, output_signature=NOTE_GENERATOR_OUTPUT_SIGNATURE)
                          .cache()
                          .prefetch(tf.data.AUTOTUNE)
                          .apply(tf.data.experimental.assert_cardinality(note_train_ds_card))
                         )''';

In [None]:
%%time
np.random.seed(333)
note_sample_input = None
number_to_take = 500
for x in note_train_dataset.take(number_to_take):
    note_sample_input = x
    
print(f'number of batches in training data: {note_train_ds_card}')
print(f'sample batch shape --- {note_sample_input.shape}')
print(f'sample batch max velocity --- {round(note_sample_input.numpy().max() * 127)}')
print(f'sample batch mean velocity --- {note_sample_input.numpy().mean() * 127}')
print(f'sample batch number of unique velocities --- {len(np.unique(note_sample_input.numpy()))}')
print(f'\ntime to take {number_to_take} batches')

number of batches in data: 20489
sample batch shape --- (32, 90, 32)
sample batch max velocity --- 110
sample batch mean velocity --- 3.1962562538683414
sample batch number of unique velocities --- 75

time to take 500 batches
CPU times: user 762 ms, sys: 280 ms, total: 1.04 s
Wall time: 1.48 s


First, we train the CVAE to learn the distribution of pianoroll values which correspond to notes/chords, and how to generate them. It is trained on batches of 128 x SEQUENCE_LENGTH tensors.

In [None]:
class MyVAE(tf.keras.Model):
    
    # Class of variational autoencoders for encoding / decoding / generating piano rolls
    
    def __init__(self, latent_dim, input_shape):
        super(MyVAE, self).__init__()
        
        self.latent_dim = latent_dim
        
        self.input_batch_shape = input_shape
        self.batch_size, self.num_pitches, self.sequence_length = input_shape
                
        conv_1_kernel_size, conv_1_strides = (5, 8), (5, 4)
        conv_2_kernel_size, conv_2_strides = (3, 2), (3, 2)
        conv_3_kernel_size, conv_3_strides = (2, 2), (2, 2)
        
        # Encoder definition
        encoder_input = Input(shape=(self.num_pitches, self.sequence_length), name='encoder_input')
        encoder_reshape = Reshape(target_shape=(self.num_pitches, self.sequence_length, 1), 
                                  name='encoder_reshape')(encoder_input)
        
        encoder_conv_1 = Conv2D(filters=64, kernel_size=conv_1_kernel_size, strides=conv_1_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_1')(encoder_reshape)
        encoder_conv_2 = Conv2D(filters=128, kernel_size=conv_2_kernel_size, strides=conv_2_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_2')(encoder_conv_1)
        encoder_conv_3 = Conv2D(filters=256, kernel_size=conv_3_kernel_size, strides=conv_3_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_3')(encoder_conv_2)
        
        down_sampled_shape = encoder_conv_3.get_shape()[1:]
        
        encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)
        downsampled_total_nodes = encoder_flatten.get_shape()[-1]
                
        encoder_mean = Dense(self.latent_dim, activation='linear', name='encoder_mu')(encoder_flatten)
        encoder_log_sigma = Dense(self.latent_dim, activation='linear', name='encoder_log_sigma')(encoder_flatten)
        
        self.encoder = Model(encoder_input, [encoder_mean, encoder_log_sigma], name='encoder')
        
        # Decoder definition
        decoder_input = Input(shape=(self.latent_dim), name='decoder_input')
        decoder_dense = Dense(units=downsampled_total_nodes, activation='relu', name='decoder_dense')(decoder_input)
        decoder_reshape = Reshape(target_shape=down_sampled_shape)(decoder_dense)
        decoder_conv_1 = Conv2DTranspose(filters=128, kernel_size=conv_3_kernel_size, 
                                         strides=conv_3_strides, padding='same', 
                                         activation='relu', name='decoder_conv2dtranspose_1')(decoder_reshape)
        decoder_conv_2 = Conv2DTranspose(filters=64, kernel_size=conv_2_kernel_size, 
                                         strides=conv_2_strides, padding='same', 
                                         activation='relu', name='decoder_conv2dtranspose_2')(decoder_conv_1)
        decoder_conv_3 = Conv2DTranspose(filters=1, kernel_size=conv_1_kernel_size, 
                                         strides=conv_1_strides, padding='same', 
                                         activation='linear', name='decoder_conv2dtranspose_3')(decoder_conv_2)
        decoder_output = Reshape(target_shape=(self.num_pitches, self.sequence_length), 
                                 name='decoder_output')(decoder_conv_3)
        
        self.decoder = Model(decoder_input, decoder_output, name='decoder')
        
        # Scaling parameter for each note, timestep pair 
        self.log_sig_x = tf.Variable(tf.zeros(shape=(self.num_pitches*self.sequence_length)), 
                                     trainable=True, name='log_sig_x')
        self.binary_crossentropy = tf.keras.losses.BinaryCrossentropy(
            from_logits=False, reduction=tf.keras.losses.Reduction.NONE
        )
        
    def compile(self, optimizer, loss_weight=1):
        super(MyVAE, self).compile()
                
        self.optimizer = optimizer 
        self.loss_weight = loss_weight
        
        self.encoder.compile(self.optimizer, loss=None)
        self.decoder.compile(self.optimizer, loss=None)

    def sample_and_reparameterize(self, mu_log_sigma, n_samples):
        
        mu, log_sigma = mu_log_sigma
        sigma = tf.exp(log_sigma)
        
        b, k = mu.shape
        
        mu = tf.reshape(mu, (b, 1, k))
        sigma = tf.reshape(sigma, (b, 1, k))
        
        eps = tf.random.normal(shape=(b, n_samples, k))
        return eps * sigma + mu
    
    def decode(self, z):
                
        b, n, k = z.shape        
        z = tf.reshape(z, (b*n, -1))
        
        mu_x = self.decoder(z)        
        mu_x = tf.reshape(mu_x, (b, n, -1))
        return mu_x
    
    def generate_piano_rolls(self, n_samples):
        
        eps = tf.random.normal(shape=(n_samples, self.latent_dim))
        return (self.decoder(eps)).numpy().clip(0, 1)
    
    def encode_and_decode(self, x):
        
        mu_log_sigma = self.encoder(x)
        z = self.sample_and_reparameterize(mu_log_sigma, 1)
        return self.decoder(tf.squeeze(z))
    
    def reconstruct_piano_roll(self, x):
        
        reconstruction = self.encode_and_decode(x).numpy()
        reconstruction = reconstruction.clip(0, 1)
        return reconstruction * 127

    def log_p_x(self, x, mu_x, sigma_x):
        
        # x.shape = [batch_size, 128, seq_len]
        # mu_x.shape [batch_size, 1, 128, t]
        # sigma_x.shape = (b * 128 * seq_len)
        
        b, n = mu_x.shape[:2]
        
        x = tf.reshape(x, (b, n, -1))
        mu_x = tf.reshape(mu_x, (b, n, -1))
        
        square_error_numerator = tf.square(x - mu_x)
        square_error_denominator = 2 * tf.square(sigma_x)
        square_error = tf.divide(square_error_numerator, square_error_denominator)
                
        log_p_x = -( square_error + tf.exp(sigma_x) )
        log_p_x = tf.reduce_sum(log_p_x, axis=2)
        log_p_x = tf.reduce_mean(log_p_x, axis=[0, 1])
        
        return log_p_x
        
    def kl_q_p(self, z, mu_log_sigma):
        
        b, n, k = z.shape
        
        mu_q, log_sigma_q = mu_log_sigma
        sigma_q = tf.exp(log_sigma_q)
        mu_q = tf.reshape(mu_q, (b, 1, k))
        sigma_q = tf.reshape(sigma_q, (b, 1, k))
       
        log_p = -.5 * tf.square(z)
        
        log_q = -.5 * tf.square(z - mu_q)
        log_q = tf.divide(log_q, tf.square(sigma_q))
        log_q = log_q - tf.reshape(log_sigma_q, (b, 1, -1))
                          
        kl = tf.reduce_sum(log_q - log_p, axis=2)
        return tf.reduce_mean(kl, axis=[0, 1])      
       
    def elbo(self, x, n=1):
        
        mu_q, log_sigma_q = self.encoder(x)
        z = self.sample_and_reparameterize([mu_q, log_sigma_q], n)
        mu_x = self.decode(z)
                
        kl = self.kl_q_p(z, [mu_q, log_sigma_q])
        sigma_x = tf.exp(self.log_sig_x)
        return self.log_p_x(x, mu_x, sigma_x) - kl
    
    def compute_loss(self, x):
        return -self.elbo(x)
    
    def train_step(self, x):
        
        with tf.GradientTape() as tape:
            loss = self.compute_loss(x)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        return {'loss': loss}
    
    def test_step(self, x):
        
        loss = self.compute_loss(x)
        return {'loss': loss}
    
    def call(self, inputs, is_training=False):
        
        inputs_is_list = isinstance(inputs, list)
        
        if inputs_is_list and is_training:
            return [self.train_step(x_y) for x_y in inputs]
        
        elif inputs_is_list and not is_training:
            return [self.test_step(x_y) for x_y in inputs]
        
        elif not inputs_is_list and is_training:
            return self.train_step(inputs)
        
        elif not inputs_is_list and not is_training:
            return self.test_step(inputs)
        
    def save(self, folder_path):
        
        self.encoder.save(folder_path + 'encoder')
        self.decoder.save(folder_path + 'decoder')
        
        sig = self.log_sig_x.value().numpy()
        np.save(folder_path + 'log_sig_x', sig)
        
    def load(self, folder_path):
        
        paths = [folder_path + f for f in os.listdir(folder_path)]
        
        encoder_path = None
        decoder_path = None
        log_sig_x_path = None
        
        for p in paths:
            
            if 'encoder' in p:
                encoder_path = p
            if 'decoder' in p:
                decoder_path = p
            if 'log_sig' in p:
                log_sig_x_path = p
        
        log_sig_x = np.load(log_sig_x_path)            
        
        self.encoder = tf.keras.models.load_model(encoder_path)
        self.decoder = tf.keras.models.load_model(decoder_path)
        self.log_sig_x = tf.Variable(initial_value=log_sig_x, trainable=True)
        
cvae = MyVAE(LATENT_DIM, (BATCH_SIZE, NUM_PIANO_PITCHES, SEQUENCE_LENGTH))
cvae.compile(tf.keras.optimizers.Adam(1e-3))

model_trained_epochs = 0
model_z = LATENT_DIM
model_seq_len = SEQUENCE_LENGTH
model_loss = np.Inf

In [None]:
note_model_names = ['z_16_seq_32_epochs_3_loss_4654.966/',
                    'z_22_seq_32_epochs_4_loss_4674.546/',
                    'z_29_seq_64_epochs_5_loss_9435.579/',
                    'z_64_seq_64_epochs_5_loss_9269.704/',
                    'z_32_seq_32_epochs_17_loss_3218.005/'
                   ]

# Load in model

note_load_model_name = note_model_names[-1]
note_load_model_flag = False

if note_load_model_flag and os.path.exists(CVAE_DIR + note_load_model_name):

    print(f'loading {note_load_model_name}')
    cvae.load(CVAE_DIR + note_load_model_name)
    
    model_trained_epochs = int(re.sub('_|epochs', '', re.search('epochs_\d+', note_load_model_name).group(0)))
    model_z = int(re.sub('_|z', '', re.search('z_\d+', note_load_model_name).group(0)))
    model_seq_len = int(re.sub('_|seq', '', re.search('seq_\d+', note_load_model_name).group(0)))
    model_loss = float(re.search('\d+\.\d+', note_load_model_name).group(0))
    
cvae.encoder.summary()
cvae.decoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 90, 32)]     0           []                               
                                                                                                  
 encoder_reshape (Reshape)      (None, 90, 32, 1)    0           ['encoder_input[0][0]']          
                                                                                                  
 encoder_conv2d_1 (Conv2D)      (None, 18, 8, 64)    2624        ['encoder_reshape[0][0]']        
                                                                                                  
 encoder_conv2d_2 (Conv2D)      (None, 6, 4, 128)    49280       ['encoder_conv2d_1[0][0]']       
                                                                                            

In [None]:
test_encode_sample_output = cvae.encoder(note_sample_input)
print(f'mean encode output shape: {test_encode_sample_output[0].shape}')
print(f'var encode output shape: {test_encode_sample_output[1].shape}')

test_sar_sample_output = cvae.sample_and_reparameterize(test_encode_sample_output, 1)
print(f'sample/repar output shape: {test_sar_sample_output.shape}')

test_decode_sample_output = cvae.decode(test_sar_sample_output)
print(f'decode output shape: {test_decode_sample_output.shape}')

test_generate_piano_rolls_output = cvae.generate_piano_rolls(2)
print(f'generate output shape: {test_generate_piano_rolls_output.shape}')

test_logpx_sample_output = cvae.log_p_x(note_sample_input, tf.expand_dims(test_decode_sample_output, 1), cvae.log_sig_x)
print(f'logpx output shape: {test_logpx_sample_output.shape}')

test_klqp_sample_output = cvae.kl_q_p(test_sar_sample_output, test_encode_sample_output)
print(f'klqp output shape: {test_klqp_sample_output.shape}')

test_elbo_sample_output = cvae.elbo(note_sample_input)
print(f'elbo output shape: {test_elbo_sample_output.shape}')

mean encode output shape: (32, 32)
var encode output shape: (32, 32)
sample/repar output shape: (32, 1, 32)
decode output shape: (32, 1, 2880)
generate output shape: (2, 90, 32)
logpx output shape: ()
klqp output shape: ()
elbo output shape: ()


In [None]:
clbk_monitor_val = 'val_loss' if GET_VALIDATION_DATA else 'loss'

vae_reduce_lr_clbk = tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', patience=2, factor=.1)
vae_early_stop_clbk = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=4)
#vae_ckpt_clbk = tf.keras.callbacks.ModelCheckpoint(monitor='loss', save_best_only=True, )

def vae_learning_rate_schedule(epoch, lr):
    
    if epoch <= 4:
        return lr
    
    return lr * .975

vae_lr_scheduler = tf.keras.callbacks.LearningRateScheduler(vae_learning_rate_schedule)

#TEMP_LOGS_DIR_ROOT = 'C:/_local/jupyter_notebooks/deep_learning/audio_generation/logs/fit/'
#TEMP_LOGS_DIR = TEMP_LOGS_DIR_ROOT + datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=TEMP_LOGS_DIR)

In [None]:
vae_clbks = [vae_reduce_lr_clbk, 
             vae_early_stop_clbk,
             #vae_lr_scheduler,
             #tensorboard_callback
            ]

epochs_to_train = 3

cvae_history = cvae.fit(x=note_train_dataset, epochs=epochs_to_train, steps_per_epoch=10,
                        callbacks=vae_clbks,
                        shuffle=False,
                        #validation_data=note_valid_dataset, validation_steps=valid_steps_per_epoch,
                       ).history

model_trained_epochs += len(cvae_history['loss'])

model_z_str = f'z_{model_z}'
model_seq_len_str = f'_seq_{model_seq_len}'
model_epochs_str = f'_epochs_{model_trained_epochs}'

if 'val_loss' in cvae_history.keys():
    model_loss = round(cvae_history['val_loss'][-1], 3)
    model_loss_str = f'_val_loss_{model_loss}/'
else:
    model_loss = round(cvae_history['loss'][-1], 3)
    model_loss_str = f'_loss_{model_loss}/'

model_name =  model_seq_len_str + model_epochs_str + model_loss_str
model_dir = os.path.join(model_z_str, model_name)
model_save_path = os.path.join(CVAE_DIR, model_dir)
cvae.save(model_save_path)

Epoch 1/3
Epoch 2/3
Epoch 3/3


NameError: ignored

In [None]:
#%tensorboard --logdir logs

### Sample Outputs from Note VAE

In [None]:
play_samples_from_batch(note_sample_input, 5)

sample # 25


In [None]:
# Test how well the VAE reconstructs data
sample_output = cvae.reconstruct_piano_roll(note_sample_input)
play_inputs_and_outputs(note_sample_input, sample_output, 5)

sample #31
input
output


In [None]:
# Test how well the VAE generates piano rolls
sample_generated_output = cvae.generate_piano_rolls(10)
play_samples_from_batch(sample_generated_output, 10)

sample # 9


In [None]:
write_piano_roll_batch_to_storage(sample_generated_output, NOTE_OUTPUT_DIR)

## Training VAE on sequence of notes

In [15]:
class MelodyTargetGenerator:    
    
    # Generator which yields randomly selected piano roll of length sequence_length
    # with mean velocity >= quietness_threshold
    
    def __init__(self, data, sequence_length, batch_size, num_melodies, 
                 seed=None, quietness_mask=None, split_melodies=False):
        
        self.sequence_length = sequence_length 
        self.batch_size = batch_size
        self.num_melodies = num_melodies
        self.seed = seed
        self.split_melodies = split_melodies
        
        self.melody_length = self.sequence_length * self.num_melodies
        self.total_length = 2 * self.melody_length

        self.data_is_dense = not isinstance(data, list)
                                
        # Data is single dense tensor
        if self.data_is_dense: 
            
            self.data = data.copy()
            
            if quietness_mask is None:
                quietness_mask = ([True] * self.data.shape[-1]) - self.total_length
                        
            def is_t_usable(t):
                c1 = self.data[-1, t] == 0
                c2 = quietness_mask[t]
                return c1 and c2 and self.data[-1, t+self.total_length] == 0
            
            self.usable_indices = np.array(
                [t for t in range(self.data.shape[-1] - self.total_length) if is_t_usable(t)]
            )
            self.n = len(self.usable_indices)
            
            self.data = self.data[:-1, :]
            self.p = self.data.shape[0]
            
            self.total_obs_per_epoch = self.n - (self.n % self.batch_size)
            
            self.batch_beg_id = 0
            self.batch_end_id = self.batch_size
            
        # Data is list of sparse tensors
        else:
            self.data = data
            self.num_tensors = len(self.tensors)
            self.num_piano_pitches = self.data[0].shape[0]
                
    def __iter__(self):
        
        if self.seed is not None:
            np.random.seed(self.seed)
        
        while True:
            
            if self.data_is_dense:
                
                if self.batch_beg_id == 0:
                    np.random.shuffle(self.usable_indices)
                    
                batch_x_beg_idx = self.usable_indices[self.batch_beg_id:self.batch_end_id]

                if self.split_melodies:

                    batch_x = np.zeros(shape=(self.batch_size, self.num_melodies, 90, self.sequence_length))
                    batch_y = batch_x.copy()

                    for b, x_beg_idx in enumerate(batch_x_beg_idx):

                        x = self.data[:, x_beg_idx:x_beg_idx+self.melody_length]
                        y = self.data[:, x_beg_idx+self.melody_length:x_beg_idx+2*self.melody_length]

                        sequence_idx = range(stop=self.melody_length, step=self.sequence_length)
                        for s, seq_idx in enumerate(sequence_idx):
                            batch_x[b, s, :, :] = x[:, seq_idx:seq_idx+self.sequence_length]
                            batch_y[b, s, :, :] = y[:, seq_idx:seq_idx+self.sequence_length]

                else:

                    batch_x_end_idx = batch_x_beg_idx + self.melody_length
                    batch_y_end_idx = batch_x_end_idx + self.melody_length

                    batch_x = np.stack(
                        [self.data[:, x_beg_id:x_end_id] 
                         for x_beg_id, x_end_id in zip(batch_x_beg_idx, batch_x_end_idx) 
                        ], axis=0
                    )

                    batch_y = np.stack(
                        [self.data[:, x_end_id:y_end_id] 
                         for x_end_id, y_end_id in zip(batch_x_end_idx, batch_y_end_idx) 
                        ], axis=0
                    ) 

                self.batch_beg_id += self.batch_size
                self.batch_end_id += self.batch_size
                
                if self.batch_end_id >= self.total_obs_per_epoch:
                    self.batch_beg_id = 0
                    self.batch_end_id = self.batch_size
                    
                yield tf.constant(batch_x), tf.constant(batch_y)
                
            else:
                
                sparse_tensor = self.sparse_tensors[np.random.randint(0, self.num_tensors)]
                last_start = (sparse_tensor.shape[1] - 2 * self.sequence_length - 3)

                note_start = np.random.randint(0, last_start)
                note_start -= note_start % 32

                note = tf.sparse.slice(sparse_tensor,
                                       start=[0, note_start],
                                       size=[128, self.sequence_length]
                                      )
                
                note = tf.sparse.to_dense(note)
                
                if note.shape != (self.num_piano_pitches, self.sequence_length):
                    continue

                yield note
            
    def __call__(self):
        return self.__iter__()
        

In [16]:
NUMBER_OF_MELODIES = 10
MELODY_SEQUENCE_LENGTH = 64

# Generate single melody
MELODY_GENERATOR_OUTPUT_SIGNATURE = (
    tf.TensorSpec( shape=(BATCH_SIZE, 90, NUMBER_OF_MELODIES*MELODY_SEQUENCE_LENGTH) ),
    tf.TensorSpec( shape=(BATCH_SIZE, 90, NUMBER_OF_MELODIES*MELODY_SEQUENCE_LENGTH) )
)

# Training data
melody_train_target_generator = MelodyTargetGenerator(
    PIANO_ROLLS_TRAIN, MELODY_SEQUENCE_LENGTH, BATCH_SIZE, NUMBER_OF_MELODIES, 13, QUIETNESS_MASK, False
)

melody_train_ds_card = melody_train_target_generator.total_obs_per_epoch // melody_train_target_generator.batch_size - 1
    
melody_train_dataset = (tf.data.Dataset
                        .from_generator(melody_train_target_generator, output_signature=MELODY_GENERATOR_OUTPUT_SIGNATURE)
                        .prefetch(tf.data.AUTOTUNE)
                        .apply(tf.data.experimental.assert_cardinality(melody_train_ds_card))
                       )

# Validation data
melody_valid_target_generator = MelodyTargetGenerator(
    PIANO_ROLLS_VALID, MELODY_SEQUENCE_LENGTH, BATCH_SIZE, NUMBER_OF_MELODIES, 15, QUIETNESS_MASK, False
)

melody_valid_ds_card = melody_valid_target_generator.total_obs_per_epoch // melody_valid_target_generator.batch_size - 1
    
melody_valid_dataset = (tf.data.Dataset
                        .from_generator(melody_valid_target_generator, output_signature=MELODY_GENERATOR_OUTPUT_SIGNATURE)
                        .prefetch(tf.data.AUTOTUNE)
                        .apply(tf.data.experimental.assert_cardinality(melody_valid_ds_card))
                       )

In [17]:
%%time
np.random.seed(333)
melody_sample_input = None
number_to_take = 500
for _ in melody_train_dataset.take(number_to_take):
    x, y = _
    
print(f'total number of batches in data: {melody_train_ds_card}')
print(f'sample batch_x shape: {x.shape} ----- sample batch_y shape: {y.shape}')
print(f'sample batch_x max velocity: {round(x.numpy().max() * 127)} ----- ', end='')
print(f'sample batch_y max velocity: {round(y.numpy().max() * 127)}')
print(f'sample batch_x mean velocity: {x.numpy().mean() * 127} ----- ', end='')
print(f'sample batch_y mean velocity: {y.numpy().mean() * 127}')
print(f'sample batch_x number of unique velocities: {len(np.unique(x.numpy()))} ----- ', end='')
print(f'sample batch_y number of unique velocities: {len(np.unique(y.numpy()))}')
print(f'\ntime to take {number_to_take} batches')

total number of batches in data: 39597
sample batch_x shape: (32, 90, 640) ----- sample batch_y shape: (32, 90, 640)
sample batch_x max velocity: 108 ----- sample batch_y max velocity: 110
sample batch_x mean velocity: 2.8549476750195026 ----- sample batch_y mean velocity: 2.4796190410852432
sample batch_x number of unique velocities: 97 ----- sample batch_y number of unique velocities: 102

time to take 500 batches
CPU times: user 10.1 s, sys: 290 ms, total: 10.4 s
Wall time: 10.5 s


### Creating TF Datasets of Sequences of Notes

## Training Hierarchal Model on Sequences of Notes

In [18]:
class HierarchalDecoder(tf.keras.layers.Layer):
    
    def __init__(self, conductor_units, out_steps, conv_params, dropout_val=.3,
                 conductor_activation='linear', conv_activation='relu', name='conductor'):
        super(HierarchalDecoder, self).__init__(name=name)
        
        self.conductor_units = conductor_units
        self.out_steps = out_steps
        self.conv_params = conv_params
        self.conductor_activation = conductor_activation
        self.conv_activation = conv_activation
        
        self.conductor_layers = [
            Dense(self.conductor_units, activation=self.conductor_activation) for _ in range(self.out_steps)
        ]

        conv_1_filters, conv_1_kernel_size, conv_1_strides = self.conv_params[0]
        self.conv_1_layers = [
            Conv2DTranspose(1, 
                   kernel_size=conv_1_kernel_size, 
                   strides=conv_1_strides, 
                   activation=self.conv_activation, 
                   name=f'conv2d_1_{_}'
                  )
            for _ in range(self.out_steps)
        ]

        conv_2_filters, conv_2_kernel_size, conv_2_strides = self.conv_params[1]
        self.conv_2_layers = [
            Conv2DTranspose(64, 
                   kernel_size=conv_2_kernel_size, 
                   strides=conv_2_strides, 
                   activation=self.conv_activation,
                   name=f'conv2d_2_{_}'
                  )
            for _ in range(self.out_steps)
        ]

        conv_3_filters, conv_3_kernel_size, conv_3_strides = self.conv_params[2]
        self.conv_3_layers = [
            Conv2DTranspose(128, 
                   kernel_size=conv_3_kernel_size, 
                   strides=conv_3_strides, 
                   activation=self.conv_activation,
                   name=f'conv_2d_3_{_}'
                 )
            for _ in range(self.out_steps)
        ]

        self.reshape_layer = Reshape(target_shape=(1, 1, -1))
        self.dropout_layer = Dropout(dropout_val)

    def decode(self, conductor_tensor, idx):

        output_tensor = self.conv_3_layers[idx](conductor_tensor)
        output_tensor = self.conv_2_layers[idx](output_tensor)
        output_tensor = self.conv_1_layers[idx](output_tensor)

        return output_tensor

    def call(self, latent_vector, training=True): 

        conductor_tensor = self.conductor_layers[0](latent_vector)
        output_tensor = self.reshape_layer(conductor_tensor)
        output_tensor = self.decode(output_tensor, 0)
        output_tensors = [output_tensor]

        for step in range(1, self.out_steps):

            previous_conductor_tensor = conductor_tensor

            latent_conductor_state = tf.concat([latent_vector, previous_conductor_tensor], axis=-1)
            conductor_tensor = self.dropout_layer(latent_conductor_state, training=training)
            conductor_tensor = self.conductor_layers[step](conductor_tensor)

            output_tensor = self.reshape_layer(conductor_tensor)
            output_tensor = self.decode(output_tensor, step)
            output_tensors.append(output_tensor)

        return tf.concat(output_tensors, axis=-2)

In [19]:
class HierarchalVAE(tf.keras.Model):
    
    def __init__(self, training_data_element_spec, latent_dim, num_decoder_seq):
        super(HierarchalVAE, self).__init__()
        
        self.batch_size, self.num_pitches, self.melody_length = training_data_element_spec[0].shape
        self.k = latent_dim
        self.num_decoder_seq = num_decoder_seq

        assert self.melody_length % self.num_decoder_seq == 0
        self.sub_sequence_length = self.melody_length / self.num_decoder_seq
        
        ######################################### Encoder definition #########################################

        encoder_input = Input(shape=(self.num_pitches, self.melody_length), name='encoder_input')
        encoder_reshape = Reshape(target_shape=(self.num_pitches, self.melody_length, 1), 
                                  name='encoder_reshape')(encoder_input)

        encoder_conv_1_kernel_size, encoder_conv_1_strides = (5, 5), (5, 5)
        encoder_conv_2_kernel_size, encoder_conv_2_strides = (3, 4), (3, 4)
        encoder_conv_3_kernel_size, encoder_conv_3_strides = (2, 2), (2, 2)
        
        encoder_conv_1 = Conv2D(filters=64, kernel_size=encoder_conv_1_kernel_size, strides=encoder_conv_1_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_1')(encoder_reshape)
        encoder_conv_2 = Conv2D(filters=128, kernel_size=encoder_conv_2_kernel_size, strides=encoder_conv_2_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_2')(encoder_conv_1)
        encoder_conv_3 = Conv2D(filters=256, kernel_size=encoder_conv_3_kernel_size, strides=encoder_conv_3_strides, 
                                padding='same', activation='relu', name='encoder_conv2d_3')(encoder_conv_2)
        
        down_sampled_shape = encoder_conv_3.get_shape()[1:]
        
        encoder_flatten = Flatten(name='encoder_flatten')(encoder_conv_3)
        downsampled_total_nodes = encoder_flatten.get_shape()[-1]
                
        encoder_mu = Dense(self.k, activation='linear', name='encoder_mu')(encoder_flatten)
        encoder_log_sigma = Dense(self.k, activation='linear', name='encoder_log_sigma')(encoder_flatten)
        
        self.encoder = tf.keras.Model(encoder_input, [encoder_mu, encoder_log_sigma], name='encoder')
        
        ######################################### Decoder definition #########################################

        self.conductor_units = 128
        
        decoder_conv_1_params = (64,  (5, 4), (5, 4))
        decoder_conv_2_params = (128, (3, 4), (3, 4))
        decoder_conv_3_params = (256, (6, 4), (6, 4))

        decoder_conv_params = (decoder_conv_1_params, decoder_conv_2_params, decoder_conv_3_params)

        decoder_input = Input(shape=(self.k), name='decoder_input')
        decoder_output = HierarchalDecoder(
            self.conductor_units, self.num_decoder_seq, decoder_conv_params
        )(decoder_input)
        decoder_output = Reshape(target_shape=(self.num_pitches, self.melody_length), name='output')(decoder_output)
        self.decoder = tf.keras.Model(decoder_input, decoder_output, name='decoder')

        self.log_sig_y = tf.Variable(
            initial_value=tf.zeros( shape=(self.num_pitches, self.melody_length) ), trainable=True
        )
    
    def sample_and_reparameterize(self, mu_log_sigma):
        
        mu, log_sigma = mu_log_sigma
        sigma = tf.exp(log_sigma)
                
        eps = tf.random.normal(shape=mu.shape)
        return eps * sigma + mu 
        
    def log_p_y(self, y, y_hat, sig_y):
                
        square_error_numerator = tf.square(y - y_hat)
        square_error_denominator = 2 * tf.square(sig_y)
        square_error = tf.divide(square_error_numerator, square_error_denominator)
                
        log_p_y = -( square_error + tf.exp(sig_y) )
        log_p_y = tf.reduce_sum(log_p_y, axis=1)
        log_p_y = tf.reduce_mean(log_p_y)
        
        return log_p_y
        
    def kl_q_p(self, z, mu_log_sigma):
                
        mu_q, log_sigma_q = mu_log_sigma
        sigma_q = tf.exp(log_sigma_q)

        log_p = -.5 * tf.square(z)
        
        log_q = -.5 * tf.square(z - mu_q)
        log_q = tf.divide(log_q, tf.square(sigma_q))
        log_q = log_q - log_sigma_q
                          
        kl = tf.reduce_sum(log_q - log_p, axis=1)
        return tf.reduce_mean(kl)      
       
    def elbo(self, x_y, training):

        x, y = x_y
        
        mu_q, log_sigma_q = self.encoder(x)
        z = self.sample_and_reparameterize( (mu_q, log_sigma_q) )
        y_hat = self.decoder(z, training=training)
        sigma_y = tf.exp(self.log_sig_y)
                
        kl_q_p = self.kl_q_p(z, (mu_q, log_sigma_q) )
        log_p_y = self.log_p_y(y, y_hat, sigma_y)
        return log_p_y - kl_q_p
    
    def compute_loss(self, x_y, training):
        return -self.elbo(x_y, training)   
    
    def compile(self, optimizer=tf.keras.optimizers.Adam(1e-3)):
        super(HierarchalVAE, self).compile()
        
        self.optimizer = optimizer
        
        self.encoder.compile(optimizer=optimizer, loss=None)
        self.decoder.compile(optimizer=optimizer, loss=None)
        
    def call(self, inputs, is_training=False):
        
        inputs_is_list = isinstance(inputs, list)
        
        if inputs_is_list and is_training:
            return [self.train_step(x_y) for x_y in inputs]
        
        elif inputs_is_list and not is_training:
            return [self.predict_next_melody(x_y) for x_y in inputs]
        
        elif not inputs_is_list and is_training:
            return self.train_step(inputs)
        
        elif not inputs_is_list and not is_training:
            return self.predict_next_melody(inputs)

    def predict_next_melody(self, x):

        mu_q, log_sigma_q = self.encoder(x)
        z = self.sample_and_reparameterize( (mu_q, log_sigma_q) )
        y_hat = self.decoder(z, training=False)
        return y_hat

    def train_step(self, x_y):
        
        with tf.GradientTape() as tape:
            loss = self.compute_loss(x_y, training=True)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        return {'loss': loss}
    
    def test_step(self, x_y):
        
        loss = self.compute_loss(x_y, training=False)
        return {'loss': loss}
    
    def save(self, model_dir_path):
        
        if model_dir_path[-1] != '/':
            model_dir_path += '/'

        if not os.path.exists(model_dir_path):
            os.mkdir(model_dir_path)
        
        encoder_save_path = os.path.join(model_dir_path, 'encoder')
        decoder_save_path = os.path.join(model_dir_path, 'decoder')
        sigma_y_save_path = os.path.join(model_dir_path, 'sigma_y')
        
        tf.keras.models.save_model(self.encoder, encoder_save_path)
        tf.keras.models.save_model(self.decoder, decoder_save_path)
        np.save(sigma_y_save_path, self.log_sig_y.numpy())

    def load(self, model_dir_path):

        paths = [os.path.join(model_dir_path, f) for f in os.listdir(model_dir_path)]
        
        encoder_path = None
        decoder_path = None
        log_sig_y_path = None
        
        for p in paths:
            
            if 'encoder' in p:
                encoder_path = p
            if 'decoder' in p:
                decoder_path = p
            if 'sigma_y' in p:
                log_sig_y_path = p
        
        log_sig_y = np.load(log_sig_y_path)            
        
        self.encoder = tf.keras.models.load_model(encoder_path)
        self.decoder = tf.keras.models.load_model(decoder_path)
        self.log_sig_y = tf.Variable(initial_value=log_sig_y, trainable=True)

        epochs_str = re.search('\d+_epochs', model_dir_path.split('/')[-1]).group(0)
        trained_epochs = int(''.join([d for d in epochs_str if d.isdigit()]))

        learning_rate = self.encoder.optimizer.learning_rate
        init_lr = learning_rate.initial_learning_rate
        decay_steps = learning_rate.decay_steps
        decay_rate = learning_rate.decay_rate

        new_lr = init_lr * decay_rate ** (trained_epochs)

        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate = new_lr, 
            decay_steps = decay_steps, # Total number of steps per epoch
            decay_rate = decay_rate
        )

        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
        self.compile(optimizer)

    def generate_melodies(self, num_melodies):

        z = tf.random.normal(shape=(num_melodies, 90, 640))
        return self.decoder(z, training=False)
    

In [20]:
hierarchal_vae = HierarchalVAE(melody_train_dataset.element_spec, 1024, 10)

hierarchal_vae_n_params = hierarchal_vae.encoder.count_params() + hierarchal_vae.decoder.count_params()

model_name = f'{MELODY_SEQUENCE_LENGTH}_s_{NUMBER_OF_MELODIES}_m_{str(hierarchal_vae_n_params)}_p'
model_root_dir = os.path.join(HIERARCHAL_VAE_DIR, model_name)

if not os.path.exists(model_root_dir):
    os.mkdir(model_root_dir)

LOAD_CHECKPOINT = True
if LOAD_CHECKPOINT and os.path.exists(model_root_dir) and len(os.listdir(model_root_dir)) > 0:

    t_max = 0
    last_checkpoint = ''
    for f in os.listdir(model_root_dir):
        path = os.path.join(model_root_dir, f)
        if os.path.getmtime(path) > t_max:
            last_checkpoint = path 

    hierarchal_vae.load(last_checkpoint)

else:

    STEPS_PER_EPOCH = melody_train_ds_card

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate = 1e-3, 
        decay_steps = STEPS_PER_EPOCH, # Total number of steps per epoch
        decay_rate = .845 # Decay factor per /decay_steps/ steps
                # ^^^^ Decays by about .5 per 4 epochs
    )

    optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
    hierarchal_vae.compile(optimizer)

hierarchal_vae.encoder.summary()
hierarchal_vae.decoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 90, 640)]    0           []                               
                                                                                                  
 encoder_reshape (Reshape)      (None, 90, 640, 1)   0           ['encoder_input[0][0]']          
                                                                                                  
 encoder_conv2d_1 (Conv2D)      (None, 18, 128, 64)  1664        ['encoder_reshape[0][0]']        
                                                                                                  
 encoder_conv2d_2 (Conv2D)      (None, 6, 32, 128)   98432       ['encoder_conv2d_1[0][0]']       
                                                                                            

In [None]:
history = hierarchal_vae.fit(melody_train_dataset, epochs=1, shuffle=False, #steps_per_epoch=5
                             validation_data=melody_valid_dataset, validation_steps=1000
                            ).history

ckpt_str = f'{len(history["loss"])}_epochs_{str(history["loss"][-1])[:6]}_loss'
ckpt_dir = os.path.join(model_root_dir, ckpt_str)

hierarchal_vae.save(ckpt_dir)



In [27]:
sample_output = hierarchal_vae(x)
display_piano_roll_audio(sample_output[3])