In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!pip install note-seq

In [None]:
import json
import os
import math
import note_seq

In [None]:
import matplotlib.pyplot as plt
import librosa
import librosa.display
from tqdm import tqdm
import IPython.display as ipd
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import backend as K
import keras
from keras.utils import losses_utils
from keras.models import Sequential, Model,load_model
from tensorflow.keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
import numpy as np
from sklearn.model_selection import train_test_split
import shutil

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
DATASET_PATH = '/content/gdrive/MyDrive/genres_original'
JSON_PATH = '/content/gdrive/MyDrive/data.json'

In [None]:
list_dir = []
for file in os.listdir(DATASET_PATH):
      list_dir.append(file)

In [None]:
def save_mfcc(dataset_path, json_path, num_mfcc=13):
    data = {
        "mapping": [],
        "labels": [],
        "mfcc": [],
        "filename": []
    }

    # loop through all genre sub-folder
    for i, (dirpath, dirnames, filenames) in enumerate(os.walk(dataset_path)):
        if dirpath is not dataset_path:
            # save genre label (i.e., sub-folder name) in the mapping
            semantic_label = dirpath.split("/")[-1]
            if semantic_label in list_dir:
                data["mapping"].append(semantic_label)
                print("\nProcessing: {}".format(semantic_label))

                # process all audio files in genre sub-dir
                for f in filenames:

                    # load audio file
                    file_path = os.path.join(dirpath, f)
                    signal, sample_rate = librosa.load(file_path, sr=SAMPLE_RATE)

                    mfcc = librosa.feature.mfcc(signal, sample_rate, n_mfcc=num_mfcc)
                    mfcc = mfcc.T

                    data["mfcc"].append(mfcc.tolist())
                    data["labels"].append(i-1)
                    data["filename"].append(f)
                    
    # save MFCCs to json file
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)
    
    return data

In [None]:
data = save_mfcc(DATASET_PATH, JSON_PATH)

In [None]:
with open("/content/gdrive/MyDrive/data.json", "r") as fp:
    data = json.load(fp)

In [None]:
shape_array = np.array(data['mfcc'][0]).shape

In [None]:
remove = []

for i, d in enumerate(data['mfcc']):
    if np.array(d).shape != shape_array:
        remove.append(i)

In [None]:
for index in sorted(remove, reverse=True):
    del data['mfcc'][index]
    del data['labels'][index]
    del data['filename'][index]

In [None]:
X = np.array(data["mfcc"])
y = np.array(data["labels"])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [None]:
model = keras.models.Sequential([    
    keras.layers.LSTM(128, return_sequences=True, input_shape=[None, 13]),
    keras.layers.LayerNormalization(),
    keras.layers.LSTM(64, return_sequences=True),
    keras.layers.LayerNormalization(),
    keras.layers.LSTM(32),
    keras.layers.Dense(10 ,  activation="softmax")
])

model.compile(
    loss ='sparse_categorical_crossentropy', 
    optimizer = keras.optimizers.Adam() , 
    metrics = ['accuracy']
)

print(model.summary())

In [None]:
es = EarlyStopping(
    monitor='val_accuracy', restore_best_weights=True, patience=8, min_delta = 0.001
)

history = model.fit(X_train, y_train, validation_data=(X_test, y_test), batch_size=32, epochs=100)

In [None]:
func = K.function([model.input], [model.layers[-2].output])
outputs = func(X)
VECTORS = outputs[0]

In [1]:
query_path = '/content/gdrive/MyDrive/query/'
query_use = '/content/gdrive/MyDrive/query_output'

query_names = os.listdir(query_path)
query_rel = os.listdir(query_use)

In [None]:
query_rel_name = []

for rel in query_rel:
    query_rel_name.append(rel.split('.')[0])

In [None]:
output_list = []
THRESH = 0.95
output = []

def similarity(query_vec):
    for i, vec in enumerate(VECTORS):
        vec = vec.reshape((1, vec.shape[0]))
        matching_results = cosine_similarity(vec , query_vec).tolist()[0][0]
        if matching_results > THRESH:
            output.append((data['filename'][i], matching_results))

for query_file in tqdm(os.listdir(query_path)):
    if query_file.split('.')[0] in query_rel_name:
        print(query_file)
        signal, sample_rate = librosa.load(os.path.join(query_path, query_file), sr=SAMPLE_RATE)
        mfcc = librosa.feature.mfcc(signal, sample_rate, n_mfcc=13)
        mfcc_query = mfcc.T
        mfcc_query = mfcc_query.reshape((1, mfcc_query.shape[0], mfcc_query.shape[1]))
        query_vec = func(mfcc_query)[0]
        output_val = similarity(query_vec)
        output_list.append((query_file,output))

In [None]:
output_indexes = []
for query_file, output in output_list:    
    query_file = '.'.join(query_file.split('_')[:2]) + '.wav'
    print('-'*30)
    print(f'QUERY:  {query_file}')
    print(f'BEST MATCHES ARE: {output}')
    outlist = [tup[0] for tup in output]
    best_match_index = outlist.index(query_file) if query_file in outlist else -1
    print(f'CORRECT OUTPUT IN INDEX: {best_match_index}')
    output_indexes.append(best_match_index)