In [78]:
#Import necessary libraries.
import pandas as pd
import librosa

import numpy as np

#Libraries for extracting and labelling data.
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

import tensorflow as tf
import random as python_random

#Libraries for implementation of CNN.
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Dense, Flatten

#Libraries for training the model
from tensorflow.keras.callbacks import ModelCheckpoint
from datetime import datetime

import csv
import random
import math#For rounding.

In [79]:
#Variable parameters
v_dataset_size = 45000 #Number of data used for tuning the model
VALIDATION_DATASET_SIZE = 5000

#
COMBINATION_COUNT_FOR_EACH_MFCC = 30

#seed to reproduce output
RANDOM_SEED_VALUE = 10

#2 possible outputs, chorus and vanilla
NUM_OF_CLASSES = 2

In [80]:
#Global Parameters
SAMPLE_RATE = 44100

#Folder paths
DATA_FOLDER = 'data232\\'
METADATA_FOLDER = 'metadata232\\'
METADATA_FILENAME = 'metadata232.csv'
TESTDATA_FOLDER = 'testdata230\\'
MODEL_FOLDER = 'tunedmodel00\\'
SYNTH_DATA_FOLDER = 'testdatasynth00\\'

CSV_READ_CHUNK_SIZE = 2000

#Maximum length of an input soundclip supported. Any audio longer than ..
#..this is not considered.
SAMPLE_LENGTH_SEC = 2

In [81]:
#Here for each parameter a list of 4 values are given.
#Each index of the list corrsponds to the each of the 4 combinations obtained ..
#..as having higher accuracy values.
#E.g.: If we want to use the 2nd set of values, we need to pick the 2nd value ..
#..from each list. E.g.: v_n_mfcc_vals[1], h_layers_count_vals[1],....etc.

#Variable parameters - PROD
v_n_mfcc_vals = [128,128,128,128]

#Variable hyperparameters
h_epochs_vals = [5,5,5,10]
h_batch_size_vals = [32,64,16,16]

#CNN hyperparameters
#Convolutional layers
h_layers_count_vals = [4,4,5,4]
h_filter_size_vals = [32,32,32,32]
h_kernel_size_vals = [(3,3),(3,3),(3,3),(3,3)]
h_strides_vals = [(1,1),(1,1),(1,1),(1,1)]
h_activation_function_vals = ['relu','relu','relu','relu']
h_max_pooling_pool_size_vals = [(2,2),(2,2),(2,2),(2,2)]
h_max_pooling_strides_vals = [(2,2),(2,2),(2,2),(2,2)]
h_dropout_rate_vals = [0.6,0.5,0.2,0.3]

#Flattening layer
h_flatten_dropout_rate_vals = [0.4,0.4,0.4,0.6]

#Output function
h_output_activation_function_vals = ['softmax','softmax','softmax','softmax']

#Model training parameters
h_loss_function_vals = ['binary_crossentropy','binary_crossentropy','categorical_crossentropy','binary_crossentropy']
h_optimizer_vals = ['adam','adam','adam','adam']

In [82]:
#Extract features from each audio file.
def features_extractor(audio_file_path,n_mfcc):
    #Selection of res_type => https://librosa.org/doc/main/generated/librosa.resample.html#librosa.resample
    #Faster method is selected
    audio, sample_rate = librosa.load(audio_file_path, res_type='kaiser_fast',sr=SAMPLE_RATE)
    
    if(len(audio)>sample_rate*SAMPLE_LENGTH_SEC):
        audio = audio[:sample_rate*SAMPLE_LENGTH_SEC]
    
    mfccs_features = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=n_mfcc)
    return mfccs_features

In [83]:
#Script to automate testing the trained model for each synthesized sound ..
#..and calculating the overall accuracy.

import os
import glob

def get_synth_accuracy(data_folder,model,n_mfcc,input_width):
    X_synthtest = []
    y_synthtest = []
    
    #Picking only .wav files
    wav_file_paths = glob.glob(data_folder+'*.wav')
    
    for wav_file_path in wav_file_paths:
        wav_file_features1 = features_extractor(wav_file_path,n_mfcc)    
        wav_file_features = pad_features(wav_file_features1,n_mfcc,input_width)
    
        X_synthtest.append(wav_file_features)
    
        wav_file_name = os.path.basename(wav_file_path)
        split_file_name = wav_file_name.split('_')

        #For easier identification;
        #-a non-chorus file name is in the format 'v_*.wav'
        #-a chorus file name is in the format 'c_*.wav'
        #The naming convention is used to label the files here.
        if(split_file_name[0]=='v'):
            wav_file_class = 0
        elif(split_file_name[0]=='c'):
            wav_file_class = 1
        else:
            wav_file_class = -1

        #Printing prediction for each file
        label = model.predict(np.array([wav_file_features]))
        classes_x=np.argmax(label,axis=1)
        prediction_class = labelencoder.inverse_transform(classes_x)
        print(str(wav_file_name)+' || label = '+str(label)+' || prediction = '+str(prediction_class))

        y_synthtest.append(wav_file_class)
        
    len_Xsynthtest = len(X_synthtest)
    len_ysynthtest = len(y_synthtest)

    X_synthtest=np.array(X_synthtest).reshape(len_Xsynthtest,n_mfcc,input_width,1)
    y_synthtest=to_categorical(labelencoder.fit_transform(y_synthtest))
    y_synthtest=y_synthtest.reshape(len_ysynthtest,2)

    synth_accuracy=model.evaluate(X_synthtest,y_synthtest,verbose=0)

    return synth_accuracy[1]

In [84]:
#Idea for cleanly writing padding function is taken from the below 2 posts.
#https://stackoverflow.com/questions/59241216/padding-numpy-arrays-to-a-specific-size
#https://towardsdatascience.com/cnns-for-audio-classification-6244954665ab
def pad_features(feature_array,expected_height,expected_width):
    array_height = feature_array.shape[0]
    array_width = feature_array.shape[1]
    
    pad_height = max(expected_height-array_height,0)
    pad_height1 = math.floor(pad_height/2)
    pad_height2 = max(pad_height-pad_height1,0)
    
    pad_width = max(expected_width-array_width,0)
    pad_width1 = math.floor(pad_width/2)
    pad_width2 = max(pad_width-pad_width1,0)
    
    #print('h->'+str(pad_height)+'|| w->'+str(pad_width))
    return np.pad(array=feature_array,pad_width=((pad_height1,pad_height2),
                                                 (pad_width1,pad_width2)),mode='constant')

In [85]:
#Read stored metadata and get a Pandas dataframe
file_path = METADATA_FOLDER+METADATA_FILENAME

used_columns = ['filename','chorus']

dtypes = {
    'filename': 'str',
    'chorus': 'int'
}

data_chunks = pd.read_csv(file_path, usecols=used_columns,
                          dtype=dtypes, chunksize=CSV_READ_CHUNK_SIZE)

# concatenate the chunks into a single DataFrame
df = pd.concat(data_chunks, ignore_index=True)

In [86]:
#Initialization of extracted data
extracted_X = []
extracted_y = []        

#Read metadata on the dataset to fetch .wav file names.
for index, row in df.iterrows():
#Limit fetching the data when it reaches the specified dataset size
    if (index>v_dataset_size-1):
        break

    file_name = row['filename']
    class_label = row['chorus']

    features = features_extractor(DATA_FOLDER+file_name,v_n_mfcc_vals[1])    
    extracted_X.append(features)
    extracted_y.append(class_label)

#Convert to numpy arrays
extracted_X = np.array(extracted_X)
extracted_y = np.array(extracted_y)    

#Output classes
labelencoder=LabelEncoder()
extracted_y=to_categorical(labelencoder.fit_transform(extracted_y))

print(extracted_X.shape)
print(extracted_y.shape)

X_train,X_test,y_train,y_test=train_test_split(extracted_X[:v_dataset_size-VALIDATION_DATASET_SIZE],
                                               extracted_y[:v_dataset_size-VALIDATION_DATASET_SIZE],
                                               test_size=0.2,random_state=1)

#Setting seed values to get reproducible outputs and same random values for each hyperparameter set.
np.random.seed(RANDOM_SEED_VALUE)
tf.random.set_seed(RANDOM_SEED_VALUE)    
python_random.seed(RANDOM_SEED_VALUE)

#input shape of extracted mfcc 2D matrix
input_shape = (v_n_mfcc_vals[1],extracted_X.shape[2],1)

#Assignment of hyperparameters.
h_layers_count = h_layers_count_vals[1]
h_filter_size = h_filter_size_vals[1]
h_kernel_size = h_kernel_size_vals[1]
h_strides = h_strides_vals[1]
h_activation_function = h_activation_function_vals[1]
h_max_pooling_pool_size = h_max_pooling_pool_size_vals[1]
h_max_pooling_strides = h_max_pooling_strides_vals[1]
h_dropout_rate = h_dropout_rate_vals[1]
h_flatten_dropout_rate = h_flatten_dropout_rate_vals[1]
h_output_activation_function = h_output_activation_function_vals[1]
h_loss_function = h_loss_function_vals[1]
h_optimizer = h_optimizer_vals[1]
h_epochs = h_epochs_vals[1]
h_batch_size = h_batch_size_vals[1]

# create the model
model = Sequential()

#CNN Layer 1 with mirrored padding after the input layer
conv_layer1 = Conv2D(filters=h_filter_size,
                     kernel_size=h_kernel_size,
                     strides=h_strides,
                     padding='same',
                     data_format='channels_last',
                     activation=h_activation_function,
                     input_shape=input_shape)
maxpool_layer1 = MaxPooling2D(pool_size=h_max_pooling_pool_size, strides=h_max_pooling_strides, padding='valid')
dropout_layer1 = Dropout(rate=h_dropout_rate)

model.add(conv_layer1)


#Add convolutional layers
for layer_no in range(1,h_layers_count):
    conv_layer_i = Conv2D(filters=h_filter_size*(layer_no+1),
                          kernel_size=h_kernel_size,
                          strides=h_strides,
                          padding='same',
                          data_format='channels_last',
                          activation=h_activation_function)
    print('h_max_pooling_pool_size =>'+str(h_max_pooling_pool_size))
    print('layer_no =>'+str(layer_no))
    print('h_max_pooling_strides =>'+str(h_max_pooling_strides))

    maxpool_layer_i = MaxPooling2D(pool_size=h_max_pooling_pool_size, strides=h_max_pooling_strides, padding='valid')
    dropout_layer_i = Dropout(rate=h_dropout_rate)

    model.add(conv_layer_i)
    model.add(maxpool_layer_i)
    model.add(dropout_layer_i)

# Add a flattening layer after the dropout layer
flatten_layer = Flatten()
dropout_layer = Dropout(rate=h_flatten_dropout_rate)

#Output Layer
output_layer=Dense(units=NUM_OF_CLASSES,activation=h_output_activation_function)

model.add(flatten_layer)
model.add(dropout_layer)
model.add(output_layer)

#Compiling the model
model.compile(loss=h_loss_function,metrics=['accuracy'],optimizer=h_optimizer)


checkpointer = ModelCheckpoint(filepath=MODEL_FOLDER+'tuned_model_00'+'.hdf5',
                               verbose=1,
                               save_best_only=True)

start = datetime.now()

#Training the model
model.fit(X_train,
          y_train,
          batch_size=h_batch_size,
          epochs=h_epochs,
          validation_data=(X_test, y_test),
          callbacks=[checkpointer],
          verbose=1)


m_training_duration = datetime.now() - start
print('Training completed in time: ', m_training_duration)

(45000, 128, 173)
(45000, 2)
h_max_pooling_pool_size =>(2, 2)
layer_no =>1
h_max_pooling_strides =>(2, 2)
h_max_pooling_pool_size =>(2, 2)
layer_no =>2
h_max_pooling_strides =>(2, 2)
h_max_pooling_pool_size =>(2, 2)
layer_no =>3
h_max_pooling_strides =>(2, 2)
Epoch 1/5
Epoch 1: val_loss improved from inf to 0.01323, saving model to tunedmodel00\tuned_model_00.hdf5
Epoch 2/5
Epoch 2: val_loss improved from 0.01323 to 0.00542, saving model to tunedmodel00\tuned_model_00.hdf5
Epoch 3/5
Epoch 3: val_loss improved from 0.00542 to 0.00072, saving model to tunedmodel00\tuned_model_00.hdf5
Epoch 4/5
Epoch 4: val_loss improved from 0.00072 to 0.00027, saving model to tunedmodel00\tuned_model_00.hdf5
Epoch 5/5
Epoch 5: val_loss did not improve from 0.00027
Training completed in time:  3:08:02.325243


In [88]:
 #Measuring the performance of the trained model

#Validation accuracy
validation_accuracy = model.evaluate(X_test,y_test,verbose=0)
m_validation_accuracy = validation_accuracy[1]

#Test accuracy
test_accuracy = model.evaluate(extracted_X[v_dataset_size-VALIDATION_DATASET_SIZE:],
                              extracted_y[v_dataset_size-VALIDATION_DATASET_SIZE:],
                              verbose=0)
m_test_accuracy = test_accuracy[1]

m_synth_accuracy = get_synth_accuracy(SYNTH_DATA_FOLDER,model,v_n_mfcc_vals[1],extracted_X.shape[2])

c_1.wav || label = [[0.9565343  0.04346563]] || prediction = [0]
c_10.wav || label = [[3.1485415e-13 1.0000000e+00]] || prediction = [1]
c_11.wav || label = [[1.3725155e-14 1.0000000e+00]] || prediction = [1]
c_12.wav || label = [[5.5170164e-04 9.9944824e-01]] || prediction = [1]
c_13.wav || label = [[0.00503445 0.9949655 ]] || prediction = [1]
c_14.wav || label = [[3.351409e-08 1.000000e+00]] || prediction = [1]
c_15.wav || label = [[6.306799e-09 1.000000e+00]] || prediction = [1]
c_2.wav || label = [[9.9998689e-01 1.3061623e-05]] || prediction = [0]
c_3.wav || label = [[0.0080953 0.9919046]] || prediction = [1]
c_4.wav || label = [[1.6758225e-11 1.0000000e+00]] || prediction = [1]
c_5.wav || label = [[2.3912354e-09 1.0000000e+00]] || prediction = [1]
c_6.wav || label = [[2.373655e-12 1.000000e+00]] || prediction = [1]
c_7.wav || label = [[3.4896792e-14 1.0000000e+00]] || prediction = [1]
c_8.wav || label = [[9.9909854e-01 9.0148282e-04]] || prediction = [0]
c_9.wav || label = [[6.222

In [90]:
print('Model Performance----')
print('Validation data accuracy = '+str(m_validation_accuracy))
print('Test data accuracy = '+str(m_test_accuracy))
print('Synth data accuracy = '+str(m_synth_accuracy))

Model Performance----
Validation data accuracy = 0.999875009059906
Test data accuracy = 0.9995999932289124
Synth data accuracy = 0.8999999761581421


In [91]:
model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_24 (Conv2D)          (None, 128, 173, 32)      320       
                                                                 
 conv2d_25 (Conv2D)          (None, 128, 173, 64)      18496     
                                                                 
 max_pooling2d_25 (MaxPoolin  (None, 64, 86, 64)       0         
 g2D)                                                            
                                                                 
 dropout_31 (Dropout)        (None, 64, 86, 64)        0         
                                                                 
 conv2d_26 (Conv2D)          (None, 64, 86, 96)        55392     
                                                                 
 max_pooling2d_26 (MaxPoolin  (None, 32, 43, 96)       0         
 g2D)                                                 