In [None]:
# %reset

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


<h2>Pre Process Dataset</h2>
<h6>get dataset</br>
split train and test data</br>
spilt every matrix song to one second segment</br></h6>

In [3]:
import numpy as np
import os

DATASET_PATH = "./drive/MyDrive/chromagram_dataset/all.npy"
DATASET_TRAIN = "./drive/MyDrive/chromagram_dataset/train.npy"
DATASET_TEST = "./drive/MyDrive/chromagram_dataset/test.npy"
saved_data = os.path.exists(DATASET_TRAIN) and os.path.exists(DATASET_TEST)
if not saved_data:
  with open(DATASET_PATH, 'rb') as f:
      data = np.load(f, allow_pickle=True)
  data.shape

In [4]:
from sklearn.model_selection import train_test_split


if not saved_data:
  data_train, data_test, _, _ = train_test_split(data, data, test_size=0.2, random_state=42)
  del data
  data_train.shape, data_test.shape, data_train[0].shape

In [5]:
import librosa
import tensorflow as tf

In [6]:
sample_rate = 16000
hop_length = int(sample_rate * 0.1)
n_fft = int(sample_rate * 0.2)
DIFF = 1
allowed_duration = 10000
special_value = -10
frame_sec_indexes = [librosa.time_to_frames(i, sample_rate, n_fft=n_fft, hop_length=hop_length)
                     for i in range(1, allowed_duration, DIFF)]

max_frames_in_diff=max([frame_sec_indexes[i+1] - frame_sec_indexes[i] for i in range(len(frame_sec_indexes)-1)])

In [7]:
def clean_frame_matrix(feature: np.array):
    # add padding
    if 0 in feature.shape:
      print('here')
    full_matrix = np.full((max_frames_in_diff, 12), special_value, dtype=np.float32)
    full_matrix[:feature.shape[0], :feature.shape[1]] = feature

    return full_matrix

def split_features(features):

    split_features = np.empty((0, 10, 12))
    # song_indexes = dict()
    for i in range(len(features)):
        # pre_len = len(split_features)
        split_feature = np.split(features[i], [each for each in frame_sec_indexes if each < len(features[i])])
        split_features = np.append(split_features, np.array([clean_frame_matrix(each) for each in split_feature]),
                                   axis=0)
        print(f'\r{i} done', end='\r')

        # add data to song_indexes {song3: (start_index, end_index), }
        # song_indexes[i] = (pre_len, len(split_features))

    return split_features


In [8]:
if not saved_data:
  data_train = split_features(data_train)
  with open(DATASET_TRAIN, 'wb') as f:
    np.save(f, data_train)
  print(data_train.shape)
else: 
  with open(DATASET_TRAIN, 'rb') as f:
      data_train = np.load(f, allow_pickle=True)

In [9]:
if not saved_data:
  data_test = split_features(data_test)
  with open(DATASET_TEST, 'wb') as f:
    np.save(f, data_test)
  print(data_test.shape)
else: 
  with open(DATASET_TEST, 'rb') as f:
      data_test = np.load(f, allow_pickle=True)

<h2>seq2seq model</h2>


In [14]:
from keras.layers import LSTM, GRU, Dense, Input, RepeatVector, TimeDistributed, Masking, Activation
from keras.models import Model
from keras import backend as K
from keras.utils.generic_utils import get_custom_objects

# for recreate model
tf.keras.backend.clear_session()

def binary_activation(x):
  return K.cast(K.greater(x, 0), K.floatx())

get_custom_objects().update({'binary_activation': Activation(binary_activation)})


In [18]:

# encoder layers
encoder_inputs = Input(shape=(max_frames_in_diff, 12))
masking = Masking(mask_value=special_value)(encoder_inputs)
encoder_lstm = LSTM(64, return_state=True, return_sequences=True)
encoder_outputs, state_h, state_c = encoder_lstm(masking)
encoder_lstm2 = LSTM(24, return_state=True)
encoder_outputs2, state_h2, state_c2 = encoder_lstm2(encoder_outputs)
activation_layer = Activation(binary_activation)
encoder_outputs2 = activation_layer(encoder_outputs2)
encoder_states = [state_h, state_c]
encoder_states2 = [state_h2, state_c2]

# decoder input
decoder_inputs = RepeatVector(max_frames_in_diff)(encoder_outputs2)

# decoder layers
decoder_lstm = LSTM(24, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states2)
decoder_lstm2 = LSTM(64, return_sequences=True, return_state=True)
decoder_outputs2, _, _ = decoder_lstm2(decoder_outputs, initial_state=encoder_states)
decoder_time = TimeDistributed(Dense(12, activation='softmax'))
decoder_outputs2 = decoder_time(decoder_outputs2)

# define model
model = Model(encoder_inputs, decoder_outputs2)

# define encoder model
encoder_model = Model(encoder_inputs, encoder_outputs2)

# define inference decoder
decoder_state_input_h, decoder_state_input_c = Input(shape=(64,)), Input(shape=(64,))
decoder_state_input_h2, decoder_state_input_c2 = Input(shape=(24,)), Input(shape=(24,))
decoder_inputs_layer = Input(shape=(10, 24))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c, 
                         decoder_state_input_h2, decoder_state_input_c2]
decoder_outputs, state_h, state_c = decoder_lstm2(decoder_inputs_layer, initial_state=decoder_states_inputs[:2])
decoder_states = [state_h, state_c]
decoder_outputs = decoder_time(decoder_outputs)
decoder_model = Model([decoder_inputs_layer] + decoder_states_inputs, [decoder_outputs] + decoder_states)

In [19]:
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.Huber(), metrics=['accuracy'])
model.summary()

Model: "model_2"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_15 (InputLayer)           [(None, 10, 12)]     0                                            
__________________________________________________________________________________________________
masking_3 (Masking)             (None, 10, 12)       0           input_15[0][0]                   
__________________________________________________________________________________________________
lstm_11 (LSTM)                  [(None, 10, 64), (No 19712       masking_3[0][0]                  
__________________________________________________________________________________________________
lstm_12 (LSTM)                  [(None, 24), (None,  8544        lstm_11[0][0]                    
____________________________________________________________________________________________

In [22]:
model.fit(data_train, data_train, epochs=10, validation_split=0.2, workers=5)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fdcbeaca050>

In [24]:
model.evaluate(data_test, data_test)



[0.07751147449016571, 0.9395154714584351]

In [25]:
result=encoder_model.predict(data_test[1001:1002])[0]
result.shape

(24,)

In [26]:
result

array([1., 0., 1., 1., 0., 1., 1., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0.,
       1., 1., 0., 0., 0., 0., 1.], dtype=float32)

In [28]:
# save model
model_json = model.to_json()
with open("./drive/MyDrive/fingerprint_model/fingerprint_model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("./drive/MyDrive/fingerprint_model/fingerprint_model.h5")

# save encoder and decoder
encoder_json = encoder_model.to_json()
with open("./drive/MyDrive/fingerprint_model/encoder_model.json", "w") as json_file:
    json_file.write(encoder_json)
encoder_model.save_weights("./drive/MyDrive/fingerprint_model/encoder_model.h5")

decoder_json = decoder_model.to_json()
with open("./drive/MyDrive/fingerprint_model/decoder_model.json", "w") as json_file:
    json_file.write(decoder_json)
decoder_model.save_weights("./drive/MyDrive/fingerprint_model/decoder_model.h5")

In [30]:
# load encoder
from keras.models import model_from_json
json_file = open('./drive/MyDrive/fingerprint_model/encoder_model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json, 
                               {'binary_activation': Activation(binary_activation)})
# load weights into new model
loaded_model.load_weights("./drive/MyDrive/fingerprint_model/encoder_model.h5")
print("Loaded model from disk")
loaded_model.predict(data_test[1001:10002])[0]

Loaded model from disk


array([1., 0., 1., 1., 0., 1., 1., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0.,
       1., 1., 0., 0., 0., 0., 1.], dtype=float32)