In [56]:
import tensorflow as tf
print(tf.__version__)

2.9.2


# **Data Preparation**

In [None]:
import os
import numpy as np

PATH = "test_input"  
file_dir = sorted(os.listdir(PATH)) 
X_data = list()
y_data = list()
max_length = -1
for file in file_dir:
    loaded_file = np.load(PATH + "/" + file)
    # if the file name ends with "X.npy"
    if file.endswith("ans.npy"):
        y_data.append(loaded_file)
    else:
        X_data.append(loaded_file)
    max_length = max_length if max_length>=loaded_file.shape[0] else loaded_file.shape[0]

max_length += 1 # extra step to denote the end point of the song

# add 0 to the end of each sample to make them the same length
for i in range(len(X_data)):
    X_data[i] = np.pad(X_data[i], ((0, max_length-X_data[i].shape[0]), (0, 0)), 'constant')
for i in range(len(y_data)):
    og_len = y_data[i].shape[0] # original length
    y_data[i] = np.pad(y_data[i], ((0, max_length-y_data[i].shape[0]), (0, 0)), 'constant')
    
    # extend 2 index (start and end indicator)
    denote_dim = np.zeros((max_length, 2))
    denote_dim[og_len+1:, 1] = 1 # denote the end
    y_data[i] = np.append(y_data[i], denote_dim, axis=0)

start_point_index = 52
end_point_index = 53

X_data = np.array(X_data)
y_data = np.array(y_data) 

print("X_data shape: (train_size, max_length, input_size)", X_data.shape)
print("y_data shape: (train_size, max_length, n_pitch+2)", y_data.shape)

# **Model**
Structure:
* https://blog.paperspace.com/seq-to-seq-attention-mechanism-keras/ (major)
* https://blog.keras.io/a-ten-minute-introduction-to-sequence-to-sequence-learning-in-keras.html (minor)

Ref:
* https://alvinntnu.github.io/python-notes/nlp/seq-to-seq-attention-addition.html
* https://keras.io/api/layers/attention_layers/attention/
* https://www.youtube.com/watch?v=B3uws4cLcFw

## *Parameter*

In [71]:
train_size = len(X_data)
BATCH_SIZE = 64
steps_per_epoch = train_size//BATCH_SIZE

max_length = max_length # music piece length

units = 1024 # hidden units

start_point_index = start_point_index
end_point_index = end_point_index

n_pitch = 52

input_size = 42
target_size = n_pitch + 2 # n_pitch (52) + start,end (2)


## *Design*

In [72]:
import tensorflow as tf
from keras.layers import AdditiveAttention, Attention
from keras.layers import Input, Concatenate, Dense, LSTM, Embedding, GRU

class Encoder(tf.keras.Model):
  def __init__(self, vocab_size, units, batch_size):
    super(Encoder, self).__init__()
    self.batch_size = batch_size
    self.units = units

    # GRU Layer
    # glorot_uniform: Initializer for the recurrent_kernel weights matrix, 
    # used for the linear transformation of the recurrent state
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')

  # Encoder network comprises an Embedding layer followed by a GRU layer
  def call(self, x, hidden=None):
    output, state = self.gru(x, initial_state=hidden)
    return output, state

class Decoder(tf.keras.Model):
  def __init__(self, output_dim, units, batch_size):
    super(Decoder, self).__init__()
    self.batch_size = batch_size
    self.units = units
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc = tf.keras.layers.Dense(output_dim)

    # Used for attention
    self.attention = Attention(self.units) # or AdditiveAttention

  def call(self, x, hidden, enc_output):
    # x reshape == (batch_size, 1, target_size)

    # hidden shape == (batch_size, max_length)
    # enc_output shape == (batch_size, max_length, hidden_size)

    # context_vector shape == (batch_size, hidden_size)
    # attention_weights shape == (batch_size, max_length, 1)
   
    context_vector, attention_weights = self.attention([tf.expand_dims(hidden,1), enc_output], return_attention_scores=True)

    # x shape after passing through embedding == (batch_size, 1, target_size) --> (None, 1, 52)

    # x shape after concatenation == (batch_size, 1, target_size + hidden_size) --> (None, 1, 1076)
    x = tf.concat([context_vector, x], axis=-1)
    # passing the concatenated vector to the GRU
    output, state = self.gru(x)

    # output shape == (batch_size * 1, hidden_size)
    output = tf.reshape(output, (-1, output.shape[2]))

    # output shape == (batch_size, 1, target_size)
    x = tf.expand_dims(self.fc(output), 1)
    return x, state, attention_weights

### Test Encoder

In [73]:
example_input_batch = tf.random.uniform((BATCH_SIZE, max_length, input_size))
example_target_batch = tf.random.uniform((BATCH_SIZE, max_length, target_size))

In [74]:
encoder = Encoder(input_size, units, BATCH_SIZE)

sample_output, sample_hidden = encoder(example_input_batch) # can input none to use internal hidden RNN states
# sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)

print ('Encoder output shape: (batch size, max_length, units) {}'.format(sample_output.shape))
print ('Encoder hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

Encoder output shape: (batch size, max_length, units) (64, 100, 1024)
Encoder hidden state shape: (batch size, units) (64, 1024)


### Test Attention

In [75]:
attention_layer = Attention(10) # or AdditiveAttention
attention_context, attention_weights = attention_layer([tf.expand_dims(sample_hidden,1), sample_output], return_attention_scores=True)

print("Attention context shape: (batch size, 1 ,units) {}".format(attention_context.shape))
print("Attention weights shape: (batch_size, 1, max_length) {}".format(attention_weights.shape))

Attention context shape: (batch size, 1 ,units) (64, 1, 1024)
Attention weights shape: (batch_size, 1, max_length) (64, 1, 100)


### Test Decoder

In [76]:
decoder = Decoder(target_size, units, BATCH_SIZE)

sample_decoder_output, _, _ = decoder(tf.random.uniform((BATCH_SIZE, 1, target_size)),
                                      sample_hidden, sample_output)

print ('Decoder output shape: (batch_size, 1, target_size) {}'.format(sample_decoder_output.shape))

Decoder output shape: (batch_size, 1, target_size) (64, 1, 52)


## *Build*

In [77]:
from keras.models import Model
from keras.layers import Lambda
from keras import backend as K

# Set up Encoder
encoder_inputs = Input(shape=(max_length, input_size))  # process whole timestep at a time

encoder = Encoder(input_size, units, BATCH_SIZE)
encoder_outputs, states = encoder(encoder_inputs) # states := states RNN output

# Set up Decoder
decoder_inputs = Input(shape=(1, target_size)) # only process one timestep at a time

decoder = Decoder(target_size, units, BATCH_SIZE)

all_outputs = []
inputs = decoder_inputs
for _ in range(max_length):
    # Run the decoder on one timestep
    outputs, states, att_w = decoder(inputs, states, encoder_outputs)
    
    # Store the current prediction (we will concatenate all predictions later)
    all_outputs.append(outputs)
    # Reinject the outputs as inputs for the next loop iteration, as well as update the states
    inputs = outputs
    states = states

# Concatenate all predictions
decoder_outputs = Lambda(lambda x: K.concatenate(x, axis=1))(all_outputs)

# Define and compile model as previously
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

## *Training*

In [78]:
def masked_loss_function(y_true, y_pred):
    mask = tf.math.not_equal(tf.reduce_sum(y_true, axis=2), -1* target_size)  # false if it is a padding time step
    loss = tf.losses.categorical_crossentropy(y_true, y_pred)
    mask = tf.cast(mask, loss.dtype)
    loss *= mask
    return tf.reduce_mean(loss)

def masked_accuracy(y_true, y_pred):
    mask = tf.math.not_equal(tf.reduce_sum(y_true, axis=2), -1* target_size)  # false if it is a padding time step
    acc = tf.metrics.categorical_accuracy(y_true, y_pred)
    mask = tf.cast(mask, acc.dtype)
    acc *= mask
    return tf.reduce_mean(acc)

In [80]:
model.compile(optimizer='adamax',
              loss=masked_loss_function,
              metrics=masked_accuracy,
              )

In [None]:
checkpoint_filepath = './checkpoint'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    )

In [None]:
decoder_input_data = np.zeros((train_size, 1, target_size))
decoder_input_data[:, 0, start_point_index] = 1 

# Train model as previously
model.fit([X_data, decoder_input_data], 
          y_data,
          batch_size=BATCH_SIZE,
          steps_per_epoch=steps_per_epoch,
          validation_split=0.2,
          callbacks=[model_checkpoint_callback],
          )

In [None]:
model.load_weights(checkpoint_filepath)

In [None]:
import matplotlib.pyplot as plt
import time

plt.plot(model.history.history['loss'])
plt.plot(model.history.history['categorical_accuracy'])

## *Predict*

In [None]:
pred_test = model.predict(X_test)
pred_test = np.argmax(pred_test, axis=2)

# **Midi Format Output**

In [None]:
import midi_np_translation.output2midi as output2midi
PATH = "test_input"
# load np file
test_file = np.load(PATH + "/" + "4on6.mid.npy")
test_file_truth = np.load(PATH + "/" + "4on6.mid.ans.npy")
output2midi.output_to_midi(bass_ndarr=test_file_truth, output_path="4on6_truth.mid")
test_result = model.predict(slice_per_step(test_file))
test_result = np.argmax(test_result, axis=2)
output2midi.output_to_midi(bass_ndarr=test_result, output_path="4on6_result.mid")