In [None]:
import os
import numpy as np
from matplotlib import pyplot as plt
from scipy import stats, signal
from keras.models import Model, Sequential , load_model
from keras.layers import Conv2D,Conv1D, Dropout, MaxPooling2D, Flatten, Dense, Input , concatenate, MaxPooling1D, BatchNormalization, AveragePooling2D,AveragePooling1D
from keras.losses import categorical_crossentropy
from keras.utils import plot_model
from keras.optimizers import Adam, schedules
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import Normalizer , StandardScaler
from sklearn.metrics import roc_curve, roc_auc_score,auc , precision_recall_curve,  confusion_matrix, multilabel_confusion_matrix, average_precision_score, auc
from sklearn.utils import shuffle
from google.colab import drive

In [None]:
drive.mount('/content/gdrive')
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/gdrive
Mounted at /content/drive


In [None]:
# Yariv
feat_space = './drive/MyDrive/audio_colab/feats/'
# Tal
#feat_space = './drive/MyDrive/Voice Recording Database/Features/'


In [None]:
def get_names():

    #feats = ['mfcc', 'del-mfcc','del-del-mfcc', 'Spect', 'rms', 'dSdT', 'dS2dT2','zero_crossing_rate', 'spectral_centroid', 'spectral_bandwidth']
    feats = ['mfcc' ,'mel_spec_dev_2']
    dims = {
            'STFT':'image',
            'Spect':'image',
            'dSdT':'image',
            'mel_spec_dev_2':'image',
            'mfcc':'coeffs',
            'del-mfcc':'coeffs',
            'del-del-mfcc':'coeffs',
            'rms':'stats',
            'zero_crossing_rate':'stats',
            'spectral_centroid':'stats',
            'spectral_bandwidth':'stats'
            }
    return feats , dims

In [None]:
def sort_data(feat_space,best_features = None):

    X = {}
    Y = {}

    class_no = 0
    stat_names =[
            'spectral_centroid',
            'spectral_bandwidth',
            'rms',
            'zero_crossing_rate']

    for spect_path in os.listdir(feat_space):
        data = {}
        if '.npy' in spect_path and 'Copy' not in spect_path:
            label, ind = spect_path.split('_')

        obj = np.load(feat_space+spect_path, allow_pickle=True)

        if best_features == None:
            stats = obj[0,0]
            data['STFT'] = obj[0,1]
            data['mfcc'] = obj[0,2]
            data['del-mfcc'] = obj[0,3]
            data['del-del-mfcc'] = obj[0,4]  
            data['Spect'] = obj[0,5]
            data['dSdT'] = obj[0,6]
            data['dS2dT2'] = obj[0,7]

            for i in range(len(stat_names)):
                data[stat_names[i]] = stats[i,:]

        else:
            for feature in range(len(best_features) ):
                data[best_features[feature] ] = obj[0,feature ]
            
        if label not in Y.keys():
          Y[label] = class_no
          X[class_no] = []
          class_no += 1

        this_class = Y[label]
        X[this_class].append(data)

    return X , class_no


In [None]:
def split_test_train_custom(X_, class_no, ratio=0.6):

  X_train = []
  Y_train = []
  X_test = []
  Y_test = []
  
  for i in range(class_no):

      X_i = X_[i]

      available_samples = len(X_i)
      no_train_samples = int(available_samples*ratio)

      train_samples = np.random.choice(range(available_samples), no_train_samples, replace=False)

      X_train.extend([X_i[k] for k in train_samples])
      Y_train.extend([i for k in range(no_train_samples)])

      X_test.extend([X_i[k] for k in range(available_samples) if k not in train_samples])
      Y_test.extend([i for k in range(available_samples-no_train_samples)])


  return np.array(X_train), np.array(X_test), np.array(Y_train), np.array(Y_test)



In [None]:
def one_hot_encoder(_Y_train):

    classes = _Y_train.max()+1
    class_labels = np.zeros((classes))
    _Y_train2 = []

    for y in _Y_train:
        t = np.copy(class_labels)
        t[y] = 1
        _Y_train2.append(t)

    _Y_train2 = np.array(_Y_train2)

    return _Y_train2

In [None]:
def normalize_reshape(x_train, x_test, feats):
    Xtrain = {}
    Xtest = {}

    for f in feats:
        train_len = len(x_train)
        test_len = len(x_test)
        orig_shape = x_train[0][f].shape
        dims = len(orig_shape)
        norm = Normalizer()

        if dims > 1:
            h,w = orig_shape
            X0 = np.array([x[f].flatten() for x in x_train])
            Xf = np.array([x[f].flatten() for x in x_test])

            X0 = norm.fit_transform(X0).reshape(train_len, h, w)
            Xf = norm.transform(Xf).reshape(test_len, h, w)

        else:
            l = orig_shape[0]
            X0 = np.array([x[f] for x in x_train])
            Xf = np.array([x[f] for x in x_test])

            X0 = norm.fit_transform(X0).reshape(train_len, l, 1)
            Xf = norm.transform(Xf).reshape(test_len, l, 1)

        Xtrain[f] = X0
        Xtest[f] = Xf

    return Xtrain, Xtest


In [None]:
def get_model_block(inp_shape, dims, p=0.5):

    if dims=='image':

      inp = Input(shape= (inp_shape[0], inp_shape[1], 1))
      x = Conv2D(256, kernel_size=(3,3), activation='relu', input_shape=inp_shape)(inp)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(0.5*p)(x)
      x = Conv2D(128, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(0.75*p)(x)
      x = Conv2D(64, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(p)(x)
      x = Conv2D(32, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(2*p)(x)
      x = Conv2D(16, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(2*p)(x)

      
      connection_layer = Flatten()(x)

    if dims=='coeffs':

      inp = Input(shape= (inp_shape[0], inp_shape[1], 1))
      x = Conv2D(128, kernel_size=(3,3), activation='relu', input_shape=inp_shape)(inp)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(0.75*p)(x)
      x = Conv2D(64, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(p)(x)
      x = Conv2D(32, kernel_size=(3,3), activation='relu')(x)
      x = MaxPooling2D(pool_size=(2,2))(x)
      x = Dropout(2*p)(x)
      #x = Conv2D(16, kernel_size=(3,3), activation='relu')(x)
      #x = Dropout(2*p)(x)
      
      connection_layer = Flatten()(x)

    if dims=='stats':
      inp = Input(shape = (inp_shape[0],1) )
      x = Conv1D(128, kernel_size=3, activation='relu', input_shape=inp_shape)(inp)
      x = MaxPooling1D(pool_size=2)(x)
      x = Dropout(2*p)(x)
      x = Conv1D(64, kernel_size=3, activation='relu')(x)
      x = MaxPooling1D(pool_size=2)(x)
      x = Dropout(2*p)(x)
      x = Conv1D(32, kernel_size=3, activation='relu')(x)
      x = MaxPooling1D(pool_size=2)(x)
      x = Dropout(p)(x)

      connection_layer = Flatten()(x)

    m = Model(inputs = inp,outputs = connection_layer)

    return m

def model_output(models,num_class,lr=1e-3, p=0.5):

    if len(models) > 1 :
      outputs = concatenate([ m.output for m in models ])
      inputs = [ m.input for m in models ] 

    else:
      outputs = concatenate([models[0].output,models[0].output])
      inputs = [models[0].input]

    output_layer = Dense(num_class, activation='softmax')(outputs)

    lr_schedule = schedules.ExponentialDecay(
    initial_learning_rate=lr,
    decay_steps= 27,
    decay_rate=1)

    opt = Adam(lr_schedule)
    m = Model(inputs = inputs ,outputs = output_layer)
    m.compile(loss=categorical_crossentropy, optimizer=opt, metrics=['accuracy'])
    return m 

In [None]:
def training_model(X_,dims,feats,class_no,hyper_params):

  best_feats = ['mfcc', 'mel_spec_dev_2']
  lr= hyper_params[0]
  batch_size= hyper_params[1]
  dropout_block = hyper_params[2]
  dropout_out= hyper_params[3]
  epochs= hyper_params[4]
  split = hyper_params[5]
  v = 1

  x_train_best = []
  x_test_best = []
  m_list = []
  x_train_i = []
  x_test_i = []

  x_train_round,x_test_round, y_train,y_test = split_test_train_custom(X_,class_no, ratio=split)

  y_train = one_hot_encoder(y_train)
  y_test = one_hot_encoder(y_test)
  x_train_round, x_test_round = normalize_reshape(x_train_round, x_test_round, feats)

  for f in best_feats:
          
    x_train_best.append(x_train_round[f])
    x_test_best.append(x_test_round[f])

    m_list.append(get_model_block(x_test_round[f][0].shape, dims[f], p=dropout_block))
  
  m = model_output(m_list, num_class=y_train.shape[-1],lr=lr, p=dropout_out)

  x_train_i.extend([block for block in x_train_best])
  x_test_i.extend([block for block in x_test_best])

  h = m.fit(x_train_i, y_train, epochs=epochs, batch_size=batch_size,verbose=v,shuffle=True)
  loss, acc = m.evaluate(x_test_i, y_test)
  y_hat = m.predict(x_test_i)


  print('train acc:' + str(max(h.history['accuracy'])))
  print('test loss:' + str(loss) )
  print('test acc:' + str(acc) )

  #m.save_weights(filepath+'trained_model', overwrite=True, save_format=None, options=None)
    
  return m, y_test, y_hat, acc, loss

In [None]:
    feats, dims = get_names()
    best_feat = ['mfcc','mel_spec_dev_2']

    X , class_no = sort_data(feat_space,best_feat)

    


In [None]:
acc = 0
while acc < 0.93 :
  lr = 1e-3
  batch_size = np.random.randint(2,8)*2
  dropout_block = np.random.uniform(0.1,0.2)
  dropout_out = np.random.uniform(0.5,0.7)
  epochs = np.random.randint(40,60)
  split = np.random.uniform(0.7,0.75)
  hyper_params = [lr,batch_size,dropout_block,dropout_out,epochs,split] 
  m, y_test, y_hat, acc, loss_round = training_model(X,dims,feats,class_no,hyper_params)

m.save('./drive/MyDrive/audio_colab/best_model')

  


In [None]:
reconstruct_model = load_model('./drive/MyDrive/audio_colab/best_model')


In [None]:
m.summary()

In [None]:
x_train_best = []
x_test_best = []
m_list = []
x_train_i = []
x_test_i = []

x_train_round,x_test_round, y_train,y_test = split_test_train_custom(X,class_no, ratio=split)

y_train = one_hot_encoder(y_train)
y_test = one_hot_encoder(y_test)
x_train_round, x_test_round = normalize_reshape(x_train_round, x_test_round, feats)
x_train_i.extend([block for block in x_train_round])
x_test_i.extend([block for block in x_test_round])

for f in feats:          
    x_train_best.append(x_train_round[f])
    x_test_best.append(x_test_round[f])

loss, acc = reconstruct_model.evaluate(x_test_best,y_test)
y_hat = reconstruct_model.predict(x_test_best)
print(acc)



0.9523809552192688


In [None]:
print(hyper_params)

[0.001, 12, 0.19002944681105188, 0.5318713254603384, 47, 0.7495872915887234]


In [None]:
def ROC_curve(y_test,y_hat):

  fpr = dict()
  tpr = dict()
  thresholds = dict()
  roc_auc = dict()
  average_auc= dict()
  
  classes = np.max(y_test.argmax(axis=1))+1

  #for i in range(classes):
    #fpr[i], tpr[i], thresholds[i] = roc_curve(y_test[:, i], y_hat[:, i])
    #roc_auc[i] = auc(fpr[i], tpr[i])
   
  fpr["micro"], tpr["micro"], thresholds = roc_curve(y_test.ravel(), y_hat.ravel() )
  average_auc["micro"] = auc(fpr["micro"], tpr["micro"])
  plt.plot(fpr['micro'],tpr['micro'])  
  
  plt.figure(1, figsize = (10,15) )
  plt.plot([0, 1], [0, 1], 'k--')
  plt.xlabel('False positive rate')
  plt.ylabel('True positive rate')
  plt.title('ROC curve -  Average auc score, micro-averaged over all classes: AUC={0:0.2f}'.format(average_auc["micro"]))
  plt.show()

  return fpr, tpr, thresholds, classes

In [None]:
def PRC_curve(y_test,y_hat):

  precision = dict()
  recall = dict()
  thresholds = dict()
  average_precision = dict()
  classes = np.max(y_test.argmax(axis=1))+1

  #for i in range(classes):
    #precision[i], recall[i], thresholds[i] = precision_recall_curve(y_test[:, i], y_hat[:, i])
  precision["micro"], recall["micro"], thresholds = precision_recall_curve(y_test.ravel(), y_hat.ravel() )
  average_precision["micro"] = average_precision_score(y_test, y_hat,average="micro")
  plt.plot(recall['micro'],precision['micro'])
  plt.ylim([0.0, 1.05])
  plt.xlim([0.0, 1.0])
  plt.xlabel('Recall')
  plt.ylabel('Precision')
  plt.title('PRC curve - Average precision score, micro-averaged over all classes: AP={0:0.2f}'.format(average_precision["micro"]))
  plt.show()
  #plt.figure(2,figsize = (10,15) )
  #[ plt.plot( recall[i],precision[i], label='class '+str(i)) for i in range(round(classes/2) ) ] 
  #[ plt.plot( recall[i],precision[i],'--', label='class '+str(i)) for i in range(round(classes/2),classes ) ] 
  return precision, recall, thresholds

In [None]:
def operating_points(y,y_hat,classes):

  best_tpr_threshold = 0
  best_ppv_threshold = 0
  best_acc_threshold = 0
  TPR_round = 0
  PPV_round = 0
  ACC_round = 0
  thr = 0
  done = False

  while not done:
    
    y_hat_round = y_hat > thr
    cnf_matrix = confusion_matrix(y_test.argmax(axis = 1),y_hat_round.argmax(axis =1) )

    FP = cnf_matrix.sum(axis=0) - np.diag(cnf_matrix) 
    FN = cnf_matrix.sum(axis=1) - np.diag(cnf_matrix)
    TP = np.diag(cnf_matrix)
    TN = cnf_matrix.sum() - (FP + FN + TP)

    FP = FP.astype(float)
    FN = FN.astype(float)
    TP = TP.astype(float)
    TN = TN.astype(float)

    # INITIALIZE step
    # Sensitivity, hit rate, recall, or true positive rate
    if np.sum(TP+FN) != 0 :
      TPR = TP/(TP+FN)
      TPR = np.mean(TPR)

    else:
      TPR = 0

    # Precision or positive predictive value
    if np.sum(TP+FP) != 0 :
      PPV = TP/(TP+FP)
      PPV = np.mean(PPV)

    else:
      PPV = 0

    # Overall accuracy for each class
    if np.sum(TP+FP+FN+TN) != 0 :
      ACC = (TP+TN)/(TP+FP+FN+TN)
      ACC = np.mean(ACC)

    else:
      ACC = 0

    #UPDATE step
    if ACC > ACC_round:
      ACC_round = ACC
      best_acc_threshold = thr

    if PPV > PPV_round:
      PPV_round = PPV
      best_ppv_threshold = thr

    if TPR > TPR_round:
      TPR_round = TPR
      best_tpr_threshold = thr
    
    if thr > 0.99:
      done = True
      print('done')
      print('ACC score: '+str(ACC_round)+', thr: '+str(best_acc_threshold))
      print('TPR score: '+str(TPR_round)+', thr: '+str(best_tpr_threshold))
      print('PPV score: '+str(PPV_round)+', thr: '+str(best_ppv_threshold))
    
    thr +=0.001
  
  #tpr confusion matrix
  y_hat_tpr = y_hat > best_tpr_threshold

  tpr_cnf = confusion_matrix(y.argmax(axis=1),y_hat_tpr.argmax(axis=1))
  plt.figure(figsize = (15,15)) 
  plt.imshow(tpr_cnf, cmap = 'jet')
  plt.title('TPR confusion matrix')
  plt.xlabel('predicted')
  plt.ylabel('true_labels')
  plt.xticks(range(classes))
  plt.yticks(range(classes))
  plt.show() 
  
  #ppv confusion matrix
  y_hat_ppv = y_hat > best_ppv_threshold

  ppv_cnf = confusion_matrix(y.argmax(axis=1),y_hat_ppv.argmax(axis=1)) 
  plt.figure(figsize = (15,15)) 
  plt.imshow(ppv_cnf,cmap = 'jet')
  plt.title('PPV confusion matrix')
  plt.xlabel('predicted')
  plt.ylabel('true_labels')
  plt.xticks(range(classes))
  plt.yticks(range(classes))
  plt.show()

  #acc confusion matrix
  y_hat_acc = y_hat > best_acc_threshold

  acc_cnf = confusion_matrix(y.argmax(axis=1),y_hat_acc.argmax(axis=1)) 
  plt.figure(figsize = (15,15)) 
  plt.imshow(acc_cnf,cmap = 'jet')
  plt.title('ACCURACY confusion matrix')
  plt.xlabel('predicted')
  plt.ylabel('true_labels')
  plt.xticks(range(classes))
  plt.yticks(range(classes))
  plt.colorbar()
  plt.show()

  return 

In [None]:
acc_mat = confusion_matrix(y_test.argmax(axis = 1),y_hat.argmax(axis =1) )
plt.figure(figsize = (15,15)) 
plt.imshow(acc_mat,cmap = 'jet')
plt.title('accuracy confusion matrix')
plt.xlabel('predicted')
plt.ylabel('true_labels')
plt.xticks(range(15))
plt.yticks(range(15))
plt.colorbar()
plt.show()
y_test.argmax(axis=1)

FP = acc_mat.sum(axis=0) - np.diag(acc_mat) 
FN = acc_mat.sum(axis=1) - np.diag(acc_mat)
TP = np.diag(acc_mat)
TN = acc_mat.sum() - (FP + FN + TP)

FP = FP.astype(float)
FN = FN.astype(float)
TP = TP.astype(float)
TN = TN.astype(float)
# Sensitivity, hit rate, recall, or true positive rate
TPR = TP/(TP+FN)
print(np.sum(TPR))

In [None]:
fpr, tpr, roc_thresholds, classes = ROC_curve(y_test,y_hat)

In [None]:
precision, recall, prc_thresholds = PRC_curve(y_test,y_hat)

In [None]:
recall.values()

In [None]:
operating_points(y_test,y_hat,classes)