In [1]:
import os
import shutil
import sys
import h5py

import librosa

import numpy as np
from numpy import array

from sklearn.metrics import auc, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split

import tensorflow as tf

In [2]:
def mil_squared_error(y_true, y_pred):
    return tf.keras.backend.square(tf.keras.backend.max(y_pred) - tf.keras.backend.max(y_true))

adam = tf.keras.optimizers.Adam(learning_rate=1e-5)

In [3]:
# load training and testing ... 

condition = 'all'

new_train = '..//train//' + condition + '//'
new_test = '..//test//' + condition + '//'
new_val = '..//val//' + condition + '//'

def load_vectors(path):
    files = sorted(os.listdir(path))
    X = np.expand_dims(np.zeros((48, 272)), axis=0)
    y = []
    for npy in files:
        current = np.load(path+npy)
        X = np.vstack((X, current))
        label = [files.index(npy)]*len(current)       
        y = y + label       
    X = X[1:]
    y = tf.keras.utils.to_categorical(y)    
    print(X.shape)
    print(y.shape)    
    return X, y
    
X_train, y_train = load_vectors(new_train)
X_test, y_test = load_vectors(new_test)

(41068, 48, 272)
(41068, 6)
(9995, 48, 272)
(9995, 6)


In [4]:
from tensorflow import keras
model = keras.models.load_model('D://GitHub//module//five_class_ood//model.hdf5', custom_objects={'mil_squared_error': mil_squared_error})

In [None]:
y_preds = model.predict(X_train)
y_preds = [np.argmax(y) for y in y_preds]
y_trues = [np.argmax(y) for y in y_trues]

In [None]:
print(f1_score(y_trues, y_preds, average='micro'))

In [8]:
def split_model(model):
    first_half_model = keras.Sequential()
    second_half_model = keras.Sequential()
    for i in range(0, len(model.layers)):
        
        if i < len(model.layers) - 1:
            first_half_model.add(model.layers[i])
        else:
            second_half_model.add(model.layers[i])
            
    print('the original model has ' + str(len(model.layers)) + ' layers.')
    print('the penultimate (a.k.a. first half) model has ' + str(len(first_half_model.layers)) + ' layers.')
    print('the penultimate (a.k.a. second half) model has ' + str(len(second_half_model.layers)) + ' layers.')
    return first_half_model, second_half_model

In [9]:
first_half_model, second_half_model = split_model(model)

the original model has 34 layers.
the penultimate (a.k.a. first half) model has 33 layers.
the penultimate (a.k.a. second half) model has 1 layers.


In [10]:
# now, get emp_mean for each class

emp_vals = [[], [], [], [], []]

for X, y in zip(first_half_model.predict(X_train), y_train):
    emp_vals[np.argmax(y)].append(X)
    
emp_vals = np.asarray(emp_vals)

  return array(a, dtype, copy=False, order=order)


In [46]:
def get_emp_mean(emp_val):
    result = np.zeros(emp_val[0].shape).tolist()
    
    for penult_vector in emp_val:
        #penult_vector has size (1024, 0)
        for index in range(0, len(penult_vector)):
            result[index] = penult_vector[index] + result[index]
            
    for index in range(0, len(result)):
        result[index] = result[index]/len(result) 
        
    #result = np.linalg.norm(result)
    result = np.expand_dims(np.asarray(result), axis=1)
    
    print(result.shape)
    
    return result

In [47]:
emp_means = [get_emp_mean(emp_vals[0]), get_emp_mean(emp_vals[1]), \
            get_emp_mean(emp_vals[2]), get_emp_mean(emp_vals[3]), get_emp_mean(emp_vals[4])]

(4096, 1)
(4096, 1)
(4096, 1)
(4096, 1)
(4096, 1)


In [49]:
# get emprical covariance
def get_emp_covar():
    
    flag = 0
    
    for X, y in zip(first_half_model.predict(X_train), y_train):
        X = np.expand_dims(X, axis=1)
        diff = X - emp_means[np.argmax(y)]
        transpose = np.transpose(diff)
        result = diff @ transpose
        
        if flag == 0:
            emp_covar = result
            flag = 1
        else:
            emp_covar = np.add(emp_covar, result)
    
    #division_vec = np.zeros(emp_covar.shape)
    #division_vec = division_vec + len(y_train)
            
    emp_covar = emp_covar/len(y_train)
    print(emp_covar.shape)
    
    return emp_covar

emp_covar = get_emp_covar()

(4096, 4096)


In [51]:
inv_emp_covar = np.linalg.pinv(emp_covar)

def get_emp_mahalanobis(y_pred, c):
    emp_mean = emp_means[c]
    diff = y_pred - emp_mean
    transpose = np.transpose(diff)
    
    try:
        emp_mahalanobis = np.linalg.norm(transpose @ inv_emp_covar @ diff)
    except:
        result = transpose * inv_emp_covar * diff
        emp_mahalanobis = result
        
    #print(result.shape)
    #print(emp_mahalanobis.shape)
    
    return emp_mahalanobis

In [None]:
emp_mahalanobis_all_classes = [[], [], [], [], []]

for index in range(0, len(emp_vals)):
    for y_pred in emp_vals[index]:
        emp_m = get_emp_mahalanobis(y_pred, index)
        emp_mahalanobis_all_classes[index].append(emp_m)
        
np.shape(np.asarray(emp_mahalanobis_all_classes))

In [None]:
emp_mahalanobis_all_classes_stds = []
emp_mahalanobis_all_classes_means = []

for index in range(0, len(emp_mahalanobis_all_classes)):
    mean = np.mean(emp_mahalanobis_all_classes[index])
    std = np.std(emp_mahalanobis_all_classes[index])
    
    emp_mahalanobis_all_classes_means.append(mean)
    emp_mahalanobis_all_classes_stds.append(std)    

In [None]:
def get_mahalanobis_coeff(m_mean, m_std, m_dists, threshold):    
    for i in np.linspace(0, 5, 500):
        count = 0
        for m_dist in m_dists:
            m_dist = np.linalg.norm(m_dist)
            if m_mean - i*m_std < m_dist and m_mean + i*m_std > m_dist:
                count += 1
                
        if count/len(m_dists) > threshold:            
            #print(count/len(m_dists))
            return i

In [None]:
emp_mahalanobis_all_classes_coeffs = []

for m_mean, m_std, m_dists in zip(emp_mahalanobis_all_classes_means, \
                                 emp_mahalanobis_all_classes_stds, emp_mahalanobis_all_classes):
    coeff = get_mahalanobis_coeff(m_mean, m_std, m_dists, threshold=0.75)
    emp_mahalanobis_all_classes_coeffs.append(coeff)
    
print(emp_mahalanobis_all_classes_coeffs)

#emp_mahalanobis_all_classes_coeffs = [5, 5, 5, 5, 5]

In [None]:
def check_if_in_distribution(y_pred, c):
    m_dist = np.linalg.norm(get_emp_mahalanobis(y_pred, c))
    std = emp_mahalanobis_all_classes_stds[c]
    mean = emp_mahalanobis_all_classes_means[c]
    coeff = emp_mahalanobis_all_classes_coeffs[c]
    
    if mean - coeff*m_std < m_dist and mean + coeff*m_std > m_dist:
        return True
    else:
        return False

# load Calm samples

In [None]:
def load_vectors(path, include_only):
    files = sorted(os.listdir(path))
    X = np.expand_dims(np.zeros((48, 272)), axis=0)
    y = []
    for npy in files:
        if include_only in npy:        
            current = np.load(path+npy)
            X = np.vstack((X, current))
            label = [files.index(npy)]*len(current)       
            y = y + label
            
    X = X[1:]
    #y = to_categorical(y)    
    return X, y

#Other_X, _ = load_vectors(new_test, 'Other')
Calm_X, _ = load_vectors(new_test, 'Calm')

In [None]:
#preds_by_whole_model_Other = model.predict(Other_X)
#preds_by_first_half_model_Other = first_half_model(Other_X)

preds_by_whole_model_Calm = model.predict(Calm_X)
preds_by_first_half_model_Calm = first_half_model.predict(Calm_X)

In [None]:
#ind_samples_Other = 0
#ood_samples_Other = 0

ind_samples_Calm = 0
ood_samples_Calm = 0
'''
for y_pred, c in zip(preds_by_first_half_model_Other, preds_by_whole_model_Other):
    y_pred = np.expand_dims(y_pred, axis=0)
    c = np.argmax(c)
    
    if check_if_in_distribution(y_pred, c):
        ind_samples_Other += 1
    else:
        ood_samples_Other += 1
'''        
for y_pred, c in zip(preds_by_first_half_model_Calm, preds_by_whole_model_Calm):
    y_pred = np.expand_dims(y_pred, axis=1)
    c = np.argmax(c)
    
    if check_if_in_distribution(y_pred, c):
        ind_samples_Calm += 1
    else:
        ood_samples_Calm += 1

In [None]:
#print(ind_samples_Other)
#print(ood_samples_Other)
print(ind_samples_Calm)
print(ood_samples_Calm)

## ind and ood rates of the testing set with emotions in these 5 classes

In [None]:
# X_test, y_test = load_vectors(new_test)

preds_by_whole_model = model.predict(X_test)
preds_by_first_half_model = first_half_model.predict(X_test)

ind_samples = 0
ood_samples = 0

In [None]:
for y_pred, c in zip(preds_by_first_half_model, preds_by_whole_model):
    y_pred = np.expand_dims(y_pred, axis=1)
    c = np.argmax(c)
    
    if check_if_in_distribution(y_pred, c):
        ind_samples += 1
    else:
        ood_samples += 1
        
print(ind_samples)
print(ood_samples)

### Now, the evaluation

In [None]:
def load_vectors_with_calm(path):
    files = sorted(os.listdir(path))
    X = np.expand_dims(np.zeros((48, 272)), axis=0)
    y = []
    for npy in files:
        current = np.load(path+npy)
        X = np.vstack((X, current))
        label = [files.index(npy)]*len(current)       
        y = y + label       
    X = X[1:]
    y = tf.keras.utils.to_categorical(y)    
    print(X.shape)
    print(y.shape)    
    return X, y
    
#X_train, y_train = load_vectors(new_train)
X_test, y_test = load_vectors_with_calm(new_test)
y_pred = [np.argmax(y) for y in model.predict(X_test)]
y_true = [np.argmax(y) for y in y_test]

#### without out of distribution technique

In [None]:
# from sklearn.metrics import auc, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
print(f1_score(y_true, y_pred, average='micro'))

#### with out of distribution technique

In [None]:
X_test, y_test = load_vectors_with_calm(new_test)

first_half_pred = first_half_model.predict(X_test)
whole_model_pred = model.predict(X_test)

usable_X = []
usable_true_labels = []

for X, y_pred, c, true_label in zip(X_test, first_half_pred, whole_model_pred, y_test):
    y_pred = np.expand_dims(y_pred, axis=0)
    c = np.argmax(c)
    if check_if_in_distribution(y_pred, c):
        usable_X.append(X)
        usable_true_labels.append(true_label)
    
usable_X = np.asarray(usable_X)

y_true = [np.argmax(y) for y in usable_true_labels]
y_pred = [np.argmax(y) for y in model.predict(usable_X)]

In [None]:
print(f1_score(y_true, y_pred, average='micro'))