In [1]:
#https://github.com/kmsravindra/ML-AI-experiments/blob/master/AI/Neural%20Machine%20Translation/Neural%20machine%20translation%20-%20Encoder-Decoder%20seq2seq%20model.ipynb

from keras.models import Model, load_model
from keras.layers import Input, LSTM, Dense, Bidirectional, Concatenate
from keras import optimizers
from keras.callbacks import ModelCheckpoint
import numpy as np
import pandas as pd
from google.colab import files

Using TensorFlow backend.


In [2]:
batch_size=128
epochs=100

In [None]:
# Upload training inputs
uploaded_file = files.upload()

In [None]:
# Upload training targets
uploaded_file2 = files.upload()

In [3]:
# Load dataset and create pairs of inputs and targets

with np.load('split_inputs.npz') as split_inputs:
    inputs = split_inputs['train']
    
with np.load('split_targets.npz') as split_targets:
    targets = split_targets['train']

print("Number of training inputs",len(inputs))
print("Number of training targets",len(targets))

# Print examples of inputs and targets
example_in = inputs[0]
example_target = targets[0]
print("Input example:", example_in)
print("Target example:", example_target)

Number of training inputs 48
Number of training targets 48
Input example: [129 129 129 129 129 129 129 129 129 129 129 129  72 129 129 129  71 129
 129 129  69 129 129 129  67 129 129 129  67 129 129 129  69 129 129 129
  71 129 129 129  72 129 129 129  74 129 129 129  72 129 129 129  71 129
 129 129  69 129 129 129  71 129  72 129  69 129 129 129 129 129 129 129
  67 129 129 129  72 129 129 129  71 129 129 129  72 129 129 129  74 129
 129 129  74 129 129 129  76 129 129 129  74 129  72 129  74 129 129 129
  67 129 129 129  69 129 129 129  71 129 129 129  72 129  74 129  76 129
  77 129]
Target example: [129 129 129 129 129 129 129 129 129 129 129 129  67 129 129 129  67 129
 129 129  66 129 129 129  62 129 129 129  64 129  62 129  60 129 129 129
  62 129 129 129  64 129 129 129  67 129 129 129 129 129  66 129  67 129
 129 129  66 129 129 129  67 129 129 129 129 129 129 129  66 129 129 129
  62 129 129 129  67 129 129 129  65 129 129 129  67 129 129 129  67 129
 129 129  67 129 129 129

In [4]:
input_sequences = list(inputs)
target_sequences = []
input_set = set()
target_set = set()
num_samples = len(inputs)
    
for i in range(num_samples):    
    target_with_tokens = [200] + list(targets[i]) + [201] # use 200 as start token and 201 as end token
    target_sequences.append(target_with_tokens)
    
    # Add unique notes to set of notes
    for item in input_sequences[i]:
        if (item not in input_set):
            input_set.add(item)
    
    for item in target_with_tokens:
        if (item not in target_set):
            target_set.add(item)
            
input_set = sorted(list(input_set))
target_set = sorted(list(target_set))
print("Set of input notes:\n", input_set)
print("Set of target notes:\n", target_set)
print(len(input_set))

Set of input notes:
 [53, 55, 56, 57, 58, 59, 60, 62, 63, 64, 65, 67, 68, 69, 70, 71, 72, 74, 75, 76, 77, 129]
Set of target notes:
 [43, 44, 45, 46, 47, 48, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 62, 63, 64, 65, 66, 67, 68, 69, 129, 200, 201]
22


In [5]:
# dictionary to index each input note - key is index and value is the note
input_index_to_note_dict = {}

# dictionary to get note given its index - key is the note and value is the index
input_note_to_index_dict = {}

for k, v in enumerate(input_set):
    input_index_to_note_dict[k] = v
    input_note_to_index_dict[v] = k

In [6]:
# dictionary to index each target note - key is index and value is the note
target_index_to_note_dict = {}

# dictionary to get note given its index - key is the note and value is the index
target_note_to_index_dict = {}

for k, v in enumerate(target_set):
    target_index_to_note_dict[k] = v
    target_note_to_index_dict[v] = k

In [7]:
max_len_inputs = max([len(seq) for seq in input_sequences])
max_len_targets = max([len(line) for line in target_sequences])
print(max_len_inputs)
print(max_len_targets)

128
130


In [8]:
tokenized_input_sequences = np.zeros(shape=(num_samples,max_len_inputs,len(input_set)), dtype='float32')
tokenized_target_sequences = np.zeros(shape=(num_samples,max_len_targets,len(target_set)), dtype='float32')
target_data = np.zeros((num_samples, max_len_targets, len(target_set)),dtype='float32')

In [9]:
for i in range(num_samples):
    for k, note_value in enumerate(input_sequences[i]):
        tokenized_input_sequences[i,k,input_note_to_index_dict[note_value]] = 1
    
    for k, note_value in enumerate(target_sequences[i]):
        tokenized_target_sequences[i,k,target_note_to_index_dict[note_value]] = 1
        
        # decoder_target_data will be ahead by one timestep and will not include the start character.
        if k > 0:
            target_data[i,k-1, target_note_to_index_dict[note_value]] = 1

In [10]:
# Encoder model

encoder_input = Input(shape=(None,len(input_set)))
encoder_LSTM = Bidirectional(LSTM(256,return_state = True))
encoder_outputs, encoder_h_forward, encoder_c_forward, encoder_h_back, encoder_c_back = encoder_LSTM (encoder_input)

# Concatenate forward states and backward states
encoder_h = Concatenate()([encoder_h_forward, encoder_h_back])
encoder_c = Concatenate()([encoder_c_forward, encoder_c_back])
#encoder_h = concatenate([encoder_h_forward, encoder_h_back])
#encoder_c = concatenate([encoder_c_forward, encoder_c_back])
encoder_states = [encoder_h, encoder_c]

In [11]:
print(encoder_h_forward)
print(encoder_h_back)

print(encoder_h)
print(encoder_c)

Tensor("bidirectional_1/while/Exit_2:0", shape=(?, 256), dtype=float32)
Tensor("bidirectional_1/while_1/Exit_2:0", shape=(?, 256), dtype=float32)
Tensor("concatenate_1/concat:0", shape=(?, 512), dtype=float32)
Tensor("concatenate_2/concat:0", shape=(?, 512), dtype=float32)


In [12]:
# Decoder model

decoder_input = Input(shape=(None,len(target_set)))

# 512 because of concatenated forward states and backward states
decoder_LSTM = LSTM(256*2,return_sequences=True, return_state = True)
decoder_out, _ , _ = decoder_LSTM(decoder_input, initial_state=encoder_states)
decoder_dense = Dense(len(target_set),activation='softmax')
decoder_out = decoder_dense (decoder_out)

In [13]:
model = Model(inputs=[encoder_input, decoder_input],outputs=[decoder_out])

# Run training
model.compile(optimizer='adam', loss='categorical_crossentropy')
model.fit(x=[tokenized_input_sequences,tokenized_target_sequences], 
          y=target_data,
          batch_size=batch_size,
          epochs=epochs,
          validation_split=0.2)
model.save("trained_model.h5")

Train on 38 samples, validate on 10 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


  '. They will not be included '


In [16]:
# Inference models for testing

# Encoder inference model
encoder_model_inf = Model(encoder_input, encoder_states)

# Decoder inference model
decoder_state_input_h = Input(shape=(256*2,))
decoder_state_input_c = Input(shape=(256*2,))
decoder_input_states = [decoder_state_input_h, decoder_state_input_c]

decoder_out, decoder_h, decoder_c = decoder_LSTM(decoder_input, 
                                                 initial_state=decoder_input_states)

decoder_states = [decoder_h , decoder_c]

decoder_out = decoder_dense(decoder_out)

decoder_model_inf = Model(inputs=[decoder_input] + decoder_input_states,
                          outputs=[decoder_out] + decoder_states )

In [17]:
def decode_seq(inp_seq):
    
    # Initial states value is coming from the encoder 
    states_val = encoder_model_inf.predict(inp_seq)
   
    target_seq = np.zeros((1, 1, len(target_set)))
    target_seq[0, 0, target_note_to_index_dict[200]] = 1

    decoded_sequence = []
    stop_condition = False
    
    while not stop_condition:       
        decoder_out, decoder_h, decoder_c = decoder_model_inf.predict(x=[target_seq] + states_val)      
        max_val_index = np.argmax(decoder_out[0,-1,:])
        sampled_note = target_index_to_note_dict[max_val_index]
        decoded_sequence.append(sampled_note)
        
        if ((sampled_note == 201) or (len(decoded_sequence) > max_len_targets)):
            stop_condition = True
        
        target_seq = np.zeros((1, 1, len(target_set)))
        target_seq[0, 0, max_val_index] = 1
        
        states_val = [decoder_h, decoder_c]
        
    return np.array(decoded_sequence)

In [None]:
# Upload test data

from google.colab import files
uploaded_file = files.upload()

In [22]:
with np.load('test_data.npz') as data:
    test_data = data['train']
# Only if using the inputs from training set and some notes are missing
#test_data = test_data[0:len(test_data):2]

max_len_test_data = max([len(seq) for seq in test_data])
tokenized_inputs = np.zeros(shape=(len(test_data),max_len_test_data,len(input_set)), dtype='float32')  
    
for i in range(len(test_data)):
    for k, note_value in enumerate(test_data[i]):
        tokenized_inputs[i,k,input_note_to_index_dict[note_value]] = 1 

In [None]:
# Save all samples as file.
# Input sequences on even index and decoded sequences on odd index
samples = []
for seq_index in range(len(test_data)):
    inp_seq = tokenized_inputs[seq_index:seq_index+1]
    decoded_sequence = decode_seq(inp_seq)
    samples.append(test_data[seq_index])
    samples.append(decoded_sequence)
samples = np.array(samples)
np.savez('sample_pairs.npz', samples=samples)

In [None]:
# Download test pairs
files.download('sample_pairs.npz')