In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support, multilabel_confusion_matrix, plot_confusion_matrix, classification_report
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import trange
import time
import pprint
import datetime
import argparse
from scipy.stats import gmean
import yaml
import tensorflow as tf

from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

import utils
from featureExtractor import load_audio_file, get_mel_spectrogram, modify_file_variable_length
from dataLoader import get_label_files, DataGeneratorPatch, PatchGeneratorPerFile
from model import CNN_LeakyReLU, CNN_LSTM_LeakyReLU, CNN_LSTM_Att_LeakyReLU, CNN_LSTM_Att_ReLU
import test

import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
plt.rcParams["font.family"] = "Times New Roman"

### Classification reports

In [None]:
# Read parameters file from yaml passed by argument
params = yaml.load(open("params.yaml"))
params_dataset = params['dataset']
params_extract = params['extract']
params_learn = params['learn']
params_pred = params['predictive']

suffix_in = params['suffix'].get('in')
suffix_out = params['suffix'].get('out')

params_extract['audio_len_samples'] = int(params_extract.get('fs') * params_extract.get('audio_len_s'))

In [None]:
# ======================================================== PATHS FOR DATA, FEATURES and GROUND TRUTH
# where to look for the dataset
path_root_data = params_dataset.get('dataset_path')

params_path = {'path_to_features': os.path.join(path_root_data, 'features'),
               'featuredir_tr': 'audio_train_varup2/',
               'featuredir_te': 'audio_test_varup2/',
               'path_to_dataset': path_root_data,
               'audiodir_tr': 'train/',
               'audiodir_te': 'test/',
               'audio_shapedir_tr': 'audio_train_shapes/',
               'audio_shapedir_te': 'audio_test_shapes/',
               'gt_files': os.path.join(path_root_data, 'Metadata')}


params_path['featurepath_tr'] = os.path.join(params_path.get('path_to_features'), params_path.get('featuredir_tr'))
params_path['featurepath_te'] = os.path.join(params_path.get('path_to_features'), params_path.get('featuredir_te'))

params_path['audiopath_tr'] = os.path.join(params_path.get('path_to_dataset'), params_path.get('audiodir_tr'))
params_path['audiopath_te'] = os.path.join(params_path.get('path_to_dataset'), params_path.get('audiodir_te'))

params_path['audio_shapepath_tr'] = os.path.join(params_path.get('path_to_dataset'),
                                                 params_path.get('audio_shapedir_tr'))
params_path['audio_shapepath_te'] = os.path.join(params_path.get('path_to_dataset'),
                                                 params_path.get('audio_shapedir_te'))

params_files = {'gt_test': os.path.join(params_path.get('gt_files'), 'Drill_Dataset_Test.csv'),
                'gt_train': os.path.join(params_path.get('gt_files'), 'Drill_Dataset_Train.csv')}

# # ============================================= print all params to keep record in output file
print('params_files=')
pprint.pprint(params_files, width=1, indent=4)
print('params_extract=')
pprint.pprint(params_extract, width=1, indent=4)
print('params_learn=')
pprint.pprint(params_learn, width=1, indent=4)
print('params_pred=')
pprint.pprint(params_pred, width=1, indent=4)
print('\n')

In [None]:
train_csv = pd.read_csv(params_files.get('gt_train'))
test_csv = pd.read_csv(params_files.get('gt_test'))
filelist_audio_tr = train_csv.fname.values.tolist()
filelist_audio_te = test_csv.fname.values.tolist()

file_to_label = {params_path.get('audiopath_tr') + k: v for k, v in
                 zip(train_csv.fname.values, train_csv.label.values)}

list_labels = sorted(list(set(train_csv.label.values)))

label_to_int = {k: v for v, k in enumerate(list_labels)}
int_to_label = {v: k for k, v in label_to_int.items()}

file_to_int = {k: label_to_int[v] for k, v in file_to_label.items()}

In [None]:
ff_list_tr = [f for f in os.listdir(params_path.get('featurepath_tr')) if f.endswith(suffix_in + '.data') and
                  os.path.isfile(os.path.join(params_path.get('featurepath_tr'), f.replace(suffix_in, suffix_out)))]

labels_audio_train = get_label_files(filelist=ff_list_tr,
                                     dire=params_path.get('featurepath_tr'),
                                     suffix_in=suffix_in,
                                     suffix_out=suffix_out
                                     )

print('Number of clips considered as train set: {0}'.format(len(ff_list_tr)))
print('Number of labels loaded for train set: {0}'.format(len(labels_audio_train)))

In [None]:
tr_files, val_files = train_test_split(ff_list_tr,
                                       test_size=params_learn.get('val_split'),
                                       stratify=labels_audio_train,
                                       random_state=42
                                       )

tr_gen_patch = DataGeneratorPatch(feature_dir=params_path.get('featurepath_tr'),
                                  file_list=tr_files,
                                  params_learn=params_learn,
                                  params_extract=params_extract,
                                  suffix_in='_mel',
                                  suffix_out='_label',
                                  floatx=np.float32
                                  )

val_gen_patch = DataGeneratorPatch(feature_dir=params_path.get('featurepath_tr'),
                                   file_list=val_files,
                                   params_learn=params_learn,
                                   params_extract=params_extract,
                                   suffix_in='_mel',
                                   suffix_out='_label',
                                   floatx=np.float32,
                                   scaler=tr_gen_patch.scaler
                                   )


In [None]:
from model import CNN_LeakyReLU, CNN_LSTM_LeakyReLU, CNN_LSTM_Att_LeakyReLU, CNN_LSTM_Att_ReLU
model2 = CNN_LSTM_Att_LeakyReLU(params_learn=params_learn, params_extract=params_extract)
model2.load_weights('weights/dumy.hdf5')
model2.summary()

In [None]:
print('\nCompute predictions on test set:==================================================\n')

list_preds = []

te_files = [f for f in os.listdir(params_path.get('featurepath_te')) if f.endswith(suffix_in + '.data')]

te_preds = np.empty((len(te_files), params_learn.get('n_classes')))

te_gen_patch = PatchGeneratorPerFile(feature_dir=params_path.get('featurepath_te'),
                                     file_list=te_files,
                                     params_extract=params_extract,
                                     suffix_in='_mel',
                                     floatx=np.float32,
                                     scaler=tr_gen_patch.scaler
                                     )

for i in trange(len(te_files), miniters=int(len(te_files) / 100), ascii=True, desc="Predicting..."):
    patches_file = te_gen_patch.get_patches_file()

    preds_patch_list = model2.predict(patches_file).tolist()
    preds_patch = np.array(preds_patch_list)

    if params_recog.get('aggregate') == 'gmean':
        preds_file = gmean(preds_patch, axis=0)
    else:
        print('unkown aggregation method for prediction')
    te_preds[i, :] = preds_file


list_labels = np.array(list_labels)
pred_label_files_int = np.argmax(te_preds, axis=1)
pred_labels = [int_to_label[x] for x in pred_label_files_int]

te_files_wav = [f.replace(suffix_in + '.data', '.wav') for f in os.listdir(params_path.get('featurepath_te'))
                if f.endswith(suffix_in + '.data')]
pred = pd.DataFrame(te_files_wav, columns=["fname"])
pred['label'] = pred_labels

print('\nEvaluate ACC and print score============================================================================')

# read ground truth
gt_test = pd.read_csv(params_files.get('gt_test'))

# init Evaluator object
evaluator = test.Evaluator(gt_test, pred, list_labels, params_ctrl, params_files)

print('\n=============================ACCURACY===============================================================')
print('=============================ACCURACY===============================================================\n')
evaluator.evaluate_acc()
evaluator.evaluate_acc_classwise()
evaluator.print_summary_eval()

In [None]:
gt_test

In [None]:
gt_test1 = gt_test.sort_values(["fname"])
gt_test1

In [None]:
pred

In [None]:
pred1 = pred.sort_values(["fname"])
pred1

In [None]:
from sklearn.metrics import confusion_matrix

In [None]:
cm = confusion_matrix(gt_test1['label'], pred1['label'])
print(cm)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sn

cmn = (cm.astype('float') / cm.sum(axis=1)[:, np.newaxis])*100

ax = plt.subplots(figsize=(8, 5.5))[1]
sn.heatmap(cmn.T, cmap='flare', annot=True, square=True, linecolor='black', linewidths=0.75, ax = ax, fmt = '.2f', annot_kws={'size': 16})
ax.set_xlabel('Predicted', fontsize=18, fontweight='bold')
ax.xaxis.set_label_position('bottom')
ax.xaxis.set_ticklabels(["Broken", "Normal", "Other"], fontsize=16)
ax.set_ylabel('Ground Truth', fontsize=18, fontweight='bold')
ax.yaxis.set_ticklabels(["Broken", "Normal", "Other"], fontsize=16)
# plt.title('Confusion matrix', fontsize=20, fontweight='bold')
plt.tight_layout()
# plt.savefig("results/AugmentedDataset18Aug_Split_183_early_att_ori.png", bbox_inches='tight', dpi=300)
# plt.show()

In [None]:
cr2 = classification_report(gt_test1['label'], pred1['label'])
print(cr2)