In [1]:
from pathlib import Path

PROJECT_DIR = Path().resolve()
DATA_DIR = PROJECT_DIR / "data"

DATA_DOWNLOAD_ROOT = "https://github.com/czhuang/JSB-Chorales-dataset/raw/master/"
DATA_FILENAME = "Jsb16thSeparated.npz"

In [2]:
import tensorflow as tf
from tensorflow import keras

DATA_FILEPATH = keras.utils.get_file(
    DATA_FILENAME,
    DATA_DOWNLOAD_ROOT + DATA_FILENAME,
    cache_subdir=DATA_DIR
)

In [3]:
bool(tf.config.list_physical_devices('GPU'))

True

In [4]:
import numpy as np
from collections import namedtuple

def load_data(path=DATA_FILEPATH):
    with np.load(DATA_FILEPATH, "r", allow_pickle=True, encoding="latin1") as datasets:
        Dataset = namedtuple("Dataset", dir(datasets.f))
        data = {
            dataset: getattr(datasets.f, dataset)
            for dataset in dir(datasets.f)
        }
    return Dataset(**data)

data = load_data()

In [5]:
unique_notes = set()
for dataset in ["train", "valid", "test"]:
    for chorale in getattr(data, dataset):
        chorale[np.isnan(chorale)] = 0.0
        for chord in chorale:
            for note in chord:
                unique_notes.add(note)
print(unique_notes)

{0.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0}


In [6]:
from IPython.display import Audio
import pretty_midi
from pretty_midi import PrettyMIDI, Instrument, Note

def play_chorale(chorale, tempo=120, sample_rate=44100):
    note_duration = 60 / tempo
    midi = PrettyMIDI()
    instrument = pretty_midi.instrument_name_to_program("Acoustic Grand Piano")
    midi.instruments.append(Instrument(instrument))
    
    note_time = 0.0
    note_velocity = 100
    for chord in chorale:
        for note in chord:
            midi.instruments[0].notes.append(
                Note(note_velocity, note, note_time, note_time + note_duration),
            )
        note_time += note_duration
        
    return display(Audio(midi.synthesize(), rate=sample_rate))

In [7]:
# play_chorale(data.train[0])

In [8]:
def create_label(data):
    X = data[:-1]
    Y = data[1:]
    return X, Y

def create_windows(data, window_size, window_shift, drop_remainder=True):
    dataset = tf.data.Dataset.from_tensor_slices(data)
    dataset = dataset.window(window_size + 1, window_shift, drop_remainder=drop_remainder)
    return dataset.flat_map(lambda data: data.batch(window_size + 1))

def flatten_data(data):
    return tf.reshape(data, [-1])

In [9]:
def create_dataset(data, batch_size=32, window_size=32, window_shift=8,
                   preprocessing_function=None, shuffle_buffer_size=8,
                   cache=False, prefetch=None):
    data = tf.ragged.constant(data, ragged_rank=1)
    data = tf.cast(data, dtype=tf.float32)
    dataset = tf.data.Dataset.from_tensor_slices(data)
    dataset = dataset.map(flatten_data)
    dataset = dataset.flat_map(lambda data: create_windows(data, window_size, window_shift))
    if preprocessing_function:
        dataset = dataset.map(preprocessing_function)
    dataset = dataset.map(create_label)
    if shuffle_buffer_size:
        dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.batch(batch_size)
    if cache:
        dataset = dataset.cache()
    if prefetch:
        dataset = dataset.prefetch()
    return dataset

In [10]:
def preprocess(data, normalize=False):
    def set_notes_range(note):
        if note == 0:
            return 0.0
        return note - min(unique_notes - {0.0}) + 1
    data = tf.map_fn(set_notes_range, data)
    if normalize:
        data = (data - tf.math.reduce_mean(data)) / tf.math.reduce_std(data)
    return data

In [11]:
train_data = create_dataset(data.train, preprocessing_function=preprocess)
valid_data = create_dataset(data.valid, preprocessing_function=preprocess)
test_data = create_dataset(data.test, preprocessing_function=preprocess)

In [12]:
from tensorflow.keras import layers

class Conv1D_BN(layers.Layer):
    def __init__(self, filters, kernel_size=4, strides=1,
                 padding="causal", dilation_rate=1,
                 activation="relu", batch_norm=True, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.dilation_rate = dilation_rate
        self.activation = keras.activations.get(activation)
        self.batch_norm = batch_norm
        self.layers = [
            layers.Conv1D(filters, kernel_size, strides, padding, dilation_rate=dilation_rate, activation=activation),
        ]
        if self.batch_norm:
            self.layers.append(layers.BatchNormalization())
    
    def call(self, inputs):
        Z = inputs
        for layer in self.layers:
            Z = layer(Z)
        return Z
    
    def get_config(self):
        base_config = super().get_config()
        return {
            **base_config,
            "filters": self.filters,
            "kernel_size": self.kernel_size,
            "strides": self.strides,
            "padding": self.padding,
            "dilation_rate": self.dilation_rate,
            "activation": self.activation,
            "batch_norm": self.batch_norm,
        }

In [13]:
import keras_tuner as kt

def create_model(hp):
    model = keras.models.Sequential()
    model.add(layers.Embedding(input_dim=len(unique_notes), output_dim=8, input_shape=[None]))
    model.add(Conv1D_BN(32, dilation_rate=1))
    model.add(Conv1D_BN(48, dilation_rate=2))
    model.add(Conv1D_BN(64, dilation_rate=4))
    model.add(Conv1D_BN(96, dilation_rate=8))
    model.add(Conv1D_BN(128, dilation_rate=16))
    model.add(Conv1D_BN(196, dilation_rate=32))
    
    dropout_rate = hp.Float('dropout_rate', min_value=0.2, max_value=0.5, default=0.35)
    model.add(layers.LSTM(256, return_sequences=True, recurrent_dropout=dropout_rate))
    model.add(layers.LSTM(512, return_sequences=True, recurrent_dropout=dropout_rate))
    model.add(layers.Dense(len(unique_notes), activation=keras.activations.softmax))
              
    loss = keras.losses.SparseCategoricalCrossentropy()
    optimizer = keras.optimizers.Nadam(learning_rate=1e-3)
    metrics = [keras.metrics.SparseCategoricalAccuracy()]
    model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
              
    return model

In [14]:
# tuner = kt.BayesianOptimization(create_model,
#                                 objective='val_sparse_categorical_accuracy',
#                                 max_trials=10,
#                                 directory='tuner-logs',
#                                 project_name='bayesian-optimization')

In [15]:
# tuner.search(train_data, epochs=30, validation_data=valid_data)

In [16]:
MODEL_PATH = PROJECT_DIR / "models" / "model.h5"

In [17]:
# model = tuner.get_best_models(num_models=1)[0]
# model.save(MODEL_PATH)

In [18]:
model = keras.models.load_model(MODEL_PATH,
                                custom_objects={
                                    "Conv1D_BN": Conv1D_BN
                                })



In [19]:
model.evaluate(test_data)



[0.7183104753494263, 0.7939797043800354]

In [20]:
def predict_and_choose(model, chords, num_options, scale_logits):
    predictions = model.predict(chords)[0, -1:]
    logits = tf.math.log(predictions) / scale_logits
    if num_options is None:
        return tf.cast(tf.random.categorical(logits, num_samples=1), tf.float32)
    top_notes = np.flip(np.argsort(logits))[..., :num_options]
    highest_logits = np.take(logits, top_notes)
    note_index = tf.random.categorical(highest_logits, num_samples=1)
    return top_notes[0, note_index]

def generate_chorale(model, initial_chords, length, num_options=None, scale_logits=1):
    notes = tf.constant(initial_chords, dtype=tf.float32)
    notes = tf.map_fn(preprocess, notes)
    notes = tf.reshape(notes, [1, -1])
    for chord in range(length):
        for note in range(4):
            note = predict_and_choose(model, notes, num_options, scale_logits)
            notes = tf.concat([notes, note], axis=1)
    notes = tf.where(notes == 0, notes, notes + min(unique_notes - {0.0}) - 1)
    return tf.reshape(notes, shape=[-1, 4])

In [21]:
chorales = [
    generate_chorale(model, initial_chords=chorale[:8], length=40, scale_logits=1.8)
    for chorale in data.test[:5]
]

In [22]:
# for chorale in chorales:
#     play_chorale(chorale)