In [1]:
!pip install pretty_midi -q

[K     |████████████████████████████████| 5.6 MB 5.4 MB/s 
[K     |████████████████████████████████| 51 kB 6.5 MB/s 
[?25h  Building wheel for pretty-midi (setup.py) ... [?25l[?25hdone


In [2]:
import torch
import numpy as np
import tensorflow as tf
import pretty_midi
import os
import pathlib
import random
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from IPython.display import Audio
from tqdm.notebook import tqdm
import collections

In [3]:
!unzip best_model.zip -d best_model

Archive:  best_model.zip
   creating: best_model/best_model/
  inflating: best_model/best_model/saved_model.pb  
   creating: best_model/best_model/variables/
  inflating: best_model/best_model/variables/variables.data-00000-of-00001  
  inflating: best_model/best_model/variables/variables.index  


In [4]:
# Done by: Kamil

random_seed = 42
tf.random.set_seed(random_seed)
np.random.seed(random_seed)

AUTO = tf.data.experimental.AUTOTUNE

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver() 
    print('Running on TPU ', tpu.master())
except ValueError:
    tpu = None
    print('Not running on TPU')
if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
    BATCH_SIZE = 16 * strategy.num_replicas_in_sync
    print('batch size =', BATCH_SIZE)
else:
    strategy = tf.distribute.get_strategy() 
    BATCH_SIZE = 64

print("REPLICAS: ", strategy.num_replicas_in_sync)

NUMBER_OF_PIANO_NOTES = 128
SEQ_LENGTH = 100

Running on TPU  grpc://10.28.20.218:8470




batch size = 128
REPLICAS:  8


In [11]:
# Done by: Evgeny
# Helper functions

# Get all filenames from `directory`
def get_filenames(directory):
    filenames = []
    for filename in os.listdir(directory):
        f = os.path.join(directory, filename)
        if os.path.isfile(f):
            filenames.append(f)
    return filenames

# Load saved npz file
def load_roll(path):
    return np.load(path, allow_pickle=True)

# Save preprocessed rolls in npz file
def save_roll(array_map, path):
    np.savez_compressed(path, **array_map)

# Save preprocessed rolls from dataset by parts in few npz files
def save_string_notes(directory, name):
    filenames = get_filenames(directory)
    saved_rolls = {}
    files_saved = 0
    for index, filename in enumerate(tqdm(filenames)):
        pm = pretty_midi.PrettyMIDI(filename)
        notes = []
        try:
            instrument = pm.instruments[0]
        except IndexError:
            print("Skipped")
            continue
        sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
        if len(sorted_notes) > 100:
            prev_start = sorted_notes[0].start
            for note in sorted_notes:
                notes.append(note.pitch)
            saved_rolls[str(index)] = np.array(notes)
        if len(saved_rolls) % 5000 == 0 and len(saved_rolls) != 0:
            files_saved += 1
            save_roll(saved_rolls, name)
            print(f"Saved file {name}")
            saved_rolls = {}
    if len(saved_rolls) != 0:
        files_saved += 1
        save_roll(saved_rolls, name)
        print(f"Saved file {name}")

In [None]:
# Done by: Evgeny
data = load_roll('/kaggle/input/maestro-merged-npz/maestro_notes.npz')

In [None]:
# Done by: Evgeny

all_notes = np.empty(0,)
for item in tqdm(data.values()):
    all_notes = np.append(all_notes, item)
all_notes = all_notes.reshape(-1, 1)

In [None]:
# Done by Evgeny

full_dataset = tf.data.Dataset.from_tensor_slices(all_notes)
train_size = int(0.8 * len(all_notes))
test_size = int(0.2 * len(all_notes))

full_dataset = full_dataset.shuffle(buffer_size=len(all_notes))
train_dataset = full_dataset
# train_dataset = full_dataset.take(train_size)
# test_dataset = full_dataset.skip(train_size)

In [None]:
# Done by: Evgeny

# Function for generating (input, target) samples

def create_sequences(dataset, seq_length, vocab_size):
  seq_length = seq_length+1

  windows = dataset.window(seq_length, shift=1, stride=1, drop_remainder=True)

  flatten = lambda x: x.batch(seq_length, drop_remainder=True)
  sequences = windows.flat_map(flatten)

  def scale_pitch(x):
    x = x/[vocab_size]
    return x

  def split_labels(sequences):
    inputs = sequences[:-1]
    labels = sequences[-1]

    return scale_pitch(inputs), labels

  return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
# Done by: Evgeny

train_seq = create_sequences(train_dataset, SEQ_LENGTH, NUMBER_OF_PIANO_NOTES)
# test_seq = create_sequences(train_dataset, SEQ_LENGTH, NUMBER_OF_PIANO_NOTES)

train_loader = (train_seq
            .repeat()
            .batch(BATCH_SIZE, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))

# test_loader = (test_seq
#             .repeat()
#             .batch(BATCH_SIZE, drop_remainder=True)
#             .cache()
#             .prefetch(tf.data.experimental.AUTOTUNE))

In [5]:
# Done by: Kamil

input_shape = (SEQ_LENGTH, 1)
loss = tf.keras.losses.SparseCategoricalCrossentropy()
opt = tf.keras.optimizers.RMSprop(learning_rate=0.0005)

model_type = 1

if model_type == 1:
    with strategy.scope():
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.LSTM(512, input_shape=input_shape, recurrent_dropout=0.3, return_sequences=True))
        model.add(tf.keras.layers.LSTM(512, return_sequences=True, recurrent_dropout=0.3))
        model.add(tf.keras.layers.LSTM(512))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.Dropout(0.3))
        model.add(tf.keras.layers.Dense(256))
        model.add(tf.keras.layers.Activation('relu'))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.Dropout(0.3))
        model.add(tf.keras.layers.Dense(NUMBER_OF_PIANO_NOTES))
        model.add(tf.keras.layers.Activation('softmax'))
        
else:
    with strategy.scope():
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.LSTM(128, input_shape = input_shape, return_sequences=True))
        model.add(tf.keras.layers.LSTM(128))
        model.add(tf.keras.layers.Dropout(0.1))
        model.add(tf.keras.layers.Dense(NUMBER_OF_PIANO_NOTES))
        model.add(tf.keras.layers.Activation('softmax'))
        
if tpu:
    steps_per_execution = 32
    model.compile(loss=loss, optimizer=opt, steps_per_execution=steps_per_execution)
else:
    model.compile(loss=loss, optimizer=opt)
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 100, 512)          1052672   
                                                                 
 lstm_1 (LSTM)               (None, 100, 512)          2099200   
                                                                 
 lstm_2 (LSTM)               (None, 512)               2099200   
                                                                 
 batch_normalization (BatchN  (None, 512)              2048      
 ormalization)                                                   
                                                                 
 dropout (Dropout)           (None, 512)               0         
                                                                 
 dense (Dense)               (None, 256)               131328    
                                                        

In [9]:
# Done by: Kamil

# Load model from `best_model` directory

with strategy.scope():
    load_locally = tf.saved_model.LoadOptions(experimental_io_device='/job:localhost')
    model = tf.keras.models.load_model('best_model', options=load_locally)
    steps_per_execution = 32
    model.compile(loss=loss, optimizer=opt, steps_per_execution=steps_per_execution)



In [None]:
# Done by: Kamil

# Conditional checkpoints

if tpu:
    save_locally = tf.saved_model.SaveOptions(experimental_io_device='/job:localhost')
    checkpoints = tf.keras.callbacks.ModelCheckpoint('tpu_checkpoints', options=save_locally)
else:
    checkpoints = tf.keras.callbacks.ModelCheckpoint(filepath='training_checkpoints/ckpt_{epoch}', save_weights_only=True)
    
es = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=15, verbose=2, restore_best_weights=True),

callbacks = [
    checkpoints,
    es,
]

epochs = 100

In [None]:
# Done by: Kamil

# Train model

history = model.fit(
  train_loader,
  steps_per_epoch= len(train_dataset) // BATCH_SIZE // 10,
  epochs=epochs,
  callbacks=callbacks,
)

plt.plot(history.epoch, history.history['loss'], label=f'train loss')
plt.legend()
plt.show()

In [None]:
# Done by: Kamil

# Save model

with strategy.scope():
    save_locally = tf.saved_model.SaveOptions(experimental_io_device='/job:localhost')
    model.save('./best_model', options=save_locally)

In [7]:
# Done by: Kamil and Evgeny
def predict_next_note(notes, keras_model, temperature) -> int:

  assert temperature > 0

  notes = tf.expand_dims(notes, 0)
  predictions = keras_model.predict(notes, verbose = 0)
  pitch_logits = predictions
  pitch_logits /= temperature
  pitch = tf.random.categorical(pitch_logits, num_samples=1)
  while not 40 < pitch < 80:
      pitch = tf.random.categorical(pitch_logits, num_samples=1)
  pitch = tf.squeeze(pitch, axis=-1)
  return int(pitch)

def predict_sequence(seed, model, num_predictions, temperature):

    current_sequence = seed
    generated_sequence = []
    for _ in tqdm(range(num_predictions)):
      pitch = predict_next_note(current_sequence, model, temperature)
      generated_sequence.append(pitch)
      current_sequence = np.delete(current_sequence, 0, axis=0)

      current_sequence = np.append(current_sequence, np.reshape(pitch, (-1, 1)), axis=0)

    generated_sequence = np.array(generated_sequence)
    return generated_sequence


def notes_to_midi(notes, out_file, is_original: bool=False, velocity = 100, step = 0.2, duration = 0.4):

  pm = pretty_midi.PrettyMIDI()
  instrument = pretty_midi.Instrument(
      program=pretty_midi.instrument_name_to_program(
          "Acoustic Grand Piano")
      )

  prev_start = 0
  for note in notes:
    start = float(prev_start + step)
    end = float(start + duration)
    if is_original is True:
        input_pitch=int(note * NUMBER_OF_PIANO_NOTES)
    else:
        input_pitch = int(note)
    note = pretty_midi.Note(
        velocity=velocity,
        pitch=input_pitch,
        start=start,
        end=end,
    )
    instrument.notes.append(note)
    prev_start = start

  pm.instruments.append(instrument)
  pm.write(out_file)
  return pm

def midi_to_notes(path):
    saved_rolls = 0
    pm = pretty_midi.PrettyMIDI(path)
    notes = []
    try:
        instrument = pm.instruments[0]
    except IndexError:
        print("Skipped")
        return None
    sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
    if len(sorted_notes) > 100:
        prev_start = sorted_notes[0].start
        for note in sorted_notes:
            notes.append(note.pitch)
        saved_rolls = np.array(notes)
    if len(saved_rolls) % 5000 == 0 and len(saved_rolls) != 0:
        files_saved += 1
    return saved_rolls

In [18]:
# Done by: Kamil

# seed = midi_to_notes('/kaggle/input/mozart-v2/Wolfgang Amadeus Mozart/Mozart Wolfgang Amadeus Fantasia in C minor K.475 Ui9pyxdVX6Y.mid')
# seed = seed[100:200].reshape(-1, 1) / NUMBER_OF_PIANO_NOTES

seed = np.random.randint(low=40, high=100, size=(100, 1)) / NUMBER_OF_PIANO_NOTES

original = notes_to_midi(seed, f'original_test.mid', is_original=True) 
generated = notes_to_midi(predict_sequence(seed, model, num_predictions = 400, temperature = 1), f'generated_test.mid', step = 0.25, duration = 0.4)

  0%|          | 0/400 [00:00<?, ?it/s]

In [None]:
# Done by: Kamil

from datetime import datetime

def generate_for_folder(path, seq_length):
  for filename in os.listdir(path):
    f = os.path.join(path, filename)
    if os.path.isfile(f):
        try:
            seed = midi_to_notes(f)
        except:
            continue
        start = np.random.randint(50, 150)
        seed = seed[start:start+100].reshape(-1, 1) / NUMBER_OF_PIANO_NOTES
        if seed.shape[0] < 100:
            print(f"Discarded {f.split(' ')[-1]} {seed.shape}")
            continue
        else:
            now = datetime.now()
            current_time = now.strftime("_%H_%M_%S")
            print(f'Predicting {f}')
            name = f.split(' ')[-1].split('.')[0]
            generated = notes_to_midi(predict_sequence(seed, model, num_predictions = seq_length, temperature = 1), f"1_GENERATED{current_time}_{name}.mid")
            generated = notes_to_midi(predict_sequence(seed, model, num_predictions = seq_length, temperature = 2), f"2_GENERATED{current_time}_{name}.mid")


## TEST PURPOSES ##
## Function for generating music with length `seq_length` for temperatures 1 and 2,
## based on some random moment in music located in `path`
generate_for_folder(path="/test", seq_length=200)