In [1]:
import logging
import logging.handlers
import argparse
import os
import time
import shutil
import sys
import random
import numpy as np
import pandas as pd
import socket
from distutils.dir_util import copy_tree
import sklearn.metrics as metrics
from sklearn.metrics import confusion_matrix
import natsort
import datetime
import pickle
from pdb import set_trace as bp
import warnings
warnings.filterwarnings('ignore')

label = {'Not_Depressed', 'Depressed'}
LABELS = {label: i for i, label in enumerate(label)}
num_of_classes = len(LABELS)
f_score = None

In [2]:
def calculate_accuracy(target, predict, classes_num, f_score_average):
    """
    Calculates accuracy, precision, recall, F1-Score, True Negative,
    False Negative, True Positive, and False Positives of the output of
    the model

    Inputs
      target: np.array() The labels for the predicted outputs from the model
      predict: np.array() The batched outputs of the network
      classes_num: int How many classes are in the dataset
      f_score_average: str How to average the F1-Score

    Outputs:
      accuracy: Float Accuracy of the model outputs
      p_r_f: Array of Floats Precision, Recall, and F1-Score
      tn_fp_fn_tp: Array of Floats True Negative, False Positive,
                   False Negative, and True Positive
    """

    number_samples_labels = len(target)

    number_correct_predictions = np.zeros(classes_num)
    total = np.zeros(classes_num)

    for n in range(number_samples_labels):
        total[target[n]] += 1

        if target[n] == predict[n]:
            number_correct_predictions[target[n]] += 1

    con_matrix = confusion_matrix(target,
                                  predict)
    tn_fp_fn_tp = con_matrix.ravel()
    if tn_fp_fn_tp.shape != (4,):
        value = int(tn_fp_fn_tp)
        if target[0][0] == 1:
            tn_fp_fn_tp = np.array([0, 0, 0, value])
        elif target[0][0] == 0:
            tn_fp_fn_tp = np.array([value, 0, 0, 0])
        else:
            print('Error in the true_neg/false_pos value')
            sys.exit()

    if f_score_average is None:
        # This code fixes the divide by zero error
        accuracy = np.divide(number_correct_predictions,
                             total,
                             out=np.zeros_like(number_correct_predictions),
                             where=total != 0)
        p_r_f = metrics.precision_recall_fscore_support(target,
                                                        predict)
    elif f_score_average == 'macro':
        # This code fixes the divide by zero error
        accuracy = np.divide(number_correct_predictions,
                             total,
                             out=np.zeros_like(number_correct_predictions),
                             where=total != 0)
        p_r_f = metrics.precision_recall_fscore_support(target,
                                                        predict,
                                                        average='macro')
    elif f_score_average == 'micro':
        # This code fixes the divide by zero error
        accuracy = np.divide(np.sum(number_correct_predictions),
                             np.sum(total),
                             out=np.zeros_like(number_correct_predictions),
                             where=total != 0)
        p_r_f = metrics.precision_recall_fscore_support(target,
                                                        predict,
                                                        average='micro')
    else:
        raise Exception('Incorrect average!')

    if p_r_f[0].shape == (1,):
        temp = np.zeros((4, 2))
        position = int(target[0])
        for val in range(len(p_r_f)):
            temp[val][position] = float(p_r_f[val])

        p_r_f = (temp[0], temp[1], temp[2], temp[3])

    return accuracy, p_r_f, tn_fp_fn_tp

def prediction_and_accuracy(batch_output, batch_labels, initial_condition,
                            num_of_classes, complete_results, loss,
                            per_epoch_pred, f_score_average=None):
    """
    Calculates the accuracy (including F1-Score) of the predictions from a
    model. Also the True Negatives, False Negatives, True Positives, and False
    Positives are calculated. These results are stored along with results
    from previous epochs.

    Input
        batch_output: The output from the model
        batch_labels: The respective labels for the batched output
        initial_condition: Bool - True if this is the first instance to set
                           up the variables for logging accuracy
        num_of_classes: The number of classes in this dataset
        complete_results: np.array - holds results for each iteration of
                                     experiment
        loss: The value of the loss from the current epoch
        per_epoch_pred: Combined batch outputs and labels for record keeping
        f_score_average: The type of averaging to be used fro the F1-Score (
                         Macro, Micro, or None

    Output
        complete_results: np.array - holds results for each iteration of
                                     experiment
        per_epoch_pred: Combined results of batch outputs and labels for
                        current epoch
    """
    if type(batch_output) is not np.ndarray:
        batch_output = batch_output.data.cpu().numpy()
        batch_labels = batch_labels.data.cpu().numpy()

    if len(batch_output.shape) == 1:
        batch_output = batch_output.reshape(-1, 1)
    if len(batch_labels.shape) == 1:
        batch_labels = batch_labels.reshape(-1, 1)
    if initial_condition:
        per_epoch_pred = np.hstack((batch_output, batch_labels))
    else:
        temp_stack = np.hstack((batch_output, batch_labels))
        per_epoch_pred = np.vstack((per_epoch_pred, temp_stack))

    prediction = np.round(batch_output)
    prediction = prediction.reshape(-1)

    if len(batch_labels.shape) > 1:
        batch_labels = batch_labels.reshape(-1)
    if batch_labels.dtype == 'float32':
        batch_labels = batch_labels.astype(np.long)

    acc, fscore, tn_fp_fn_tp = calculate_accuracy(batch_labels,
                                                  prediction,
                                                  num_of_classes,
                                                  f_score_average)
    complete_results[0:2] += acc
    complete_results[2:8] += np.array(fscore[0:3]).reshape(1, -1)[0]
    complete_results[10] += loss
    complete_results[11:15] += tn_fp_fn_tp

    return complete_results, per_epoch_pred

def evaluation_for_test(results_dict, num_class, f_score,prediction_metric, hidden_test=False, verbose=False):
    """
    This function is only used for mode==test and data_type=='test'. Every
    prediction for each folder in the test set is accumulated to results_dict
    along with the number of instances of each folder. There will be multiple
    predictions for each folder depending on how many times the experiment
    was run during training (e.g. 5). If the user sets argument:
    prediction_metric=0 -> the accuracy will be determined for every experiment
    iteration (e.g. 5) and the best performing model will be selected. NOTE:
    This will not work if running in test mode without validation set and
    using the test_split_Depression_AVEC2017.csv file.
    prediction_metric=1 -> the average of the accumulated predictions for
    each folder will be taken and the final score will relate to these
    averaged results.
    prediction_metric=2 -> The majority vote of the accumulated predictions
    for each folder will be taken and the final score will related to these
    results
    Input
        results_dict: dictionary key: output, target, accum. For each of
                      these keys is a corresponding dictionary where key:
                      folder, value relates to the super key: output ->
                      predictions from experiments, target -> corresponding
                      label for the folder, accum -> the accumulated
                      instances of each folder
        num_class: int - Number of classes in the dataset
        f_score: str - Type of F1 Score processing
    Outputs:
        scores: List - contains accuracy, fscore and tn_fp_fn_tp
    """
    if not hidden_test:
        temp_tar = np.array(list(results_dict['target'].values()))
    final_results = {}
    exp_runthrough = len(results_dict['prob'][346])
    # print("exp_runthrough: ", exp_runthrough)
    # Pick best performing model
    if prediction_metric == 0:
        if verbose: print("\nBest of predictions")
        f_score_avg = []
        temp_out = np.zeros((exp_runthrough,
                             len(results_dict['prob'].keys())))
        temp_scores = np.zeros((exp_runthrough, 15))
        for pos, f in enumerate(results_dict['prob'].keys()):
            if exp_runthrough == 1:
                temp_out[0, pos] = results_dict['prob'][f]
            else:
                temp_out[:, pos] = list(results_dict['prob'][f])
        for exp in range(exp_runthrough):
            temp_scores[exp, :], _ = prediction_and_accuracy(temp_out[exp, :],
                                                             temp_tar,
                                                             True,
                                                             num_class,
                                                             np.zeros(15),
                                                             0,
                                                             0,
                                                             f_score)
            f_score_avg = f_score_avg + [np.mean(temp_scores[exp, 6:8])]
        best_result_index = f_score_avg.index(max(f_score_avg))
        # print(f"\nThe best performing model was experiment: "
        #       f"{best_result_index+1}")
        scores = temp_scores[best_result_index, :]
    # Average the performance of all models
    elif prediction_metric == 1:
        if verbose: print("\nAverage of predictions")
        for f in results_dict['prob'].keys():
            final_results[f] = np.average(results_dict['prob'][f])
        temp_out = np.array(list(final_results.values()))
        if not hidden_test:
            scores, _ = prediction_and_accuracy(temp_out,
                                                temp_tar,
                                                True,
                                                num_class,
                                                np.zeros(15),
                                                0,
                                                0,
                                                f_score)
        else:
            scores = temp_out
    # Calculate majority vote from all models
    elif prediction_metric == 2:
        if verbose: print("\nMajority vote of predictions")
        for f in results_dict['prob'].keys():
            final_results[f] = np.average(np.round(results_dict['prob'][f]))
        temp_out = np.array(list(final_results.values()))
        if not hidden_test:
            scores, _ = prediction_and_accuracy(temp_out,
                                                temp_tar,
                                                True,
                                                num_class,
                                                np.zeros(15),
                                                0,
                                                0,
                                                f_score)
        else:
            scores = temp_out
    if not hidden_test:
        scores[8] = np.mean(scores[0:2])
        scores[9] = np.mean(scores[6:8])
        scores = [scores[8], scores[0], scores[1], scores[9], scores[6], scores[7],
                  scores[11], scores[12], scores[13], scores[14]]
    else:
        scores = np.round(scores)

    return scores

def print_results(results_dict1, results_dict2, merged_results_dict, verbose=False):
    
    # Best Model
    speech_only = []
    text_only = []
    merged = []

    for i in range(3):
        comp_scores_1 = evaluation_for_test(results_dict1, num_of_classes, f_score,prediction_metric=i, hidden_test=False)
        comp_scores_2 = evaluation_for_test(results_dict2, num_of_classes, f_score,prediction_metric=i, hidden_test=False)
        comp_scores_merged = evaluation_for_test(merged_results_dict, num_of_classes, f_score,prediction_metric=i, hidden_test=False)

        if verbose:
            print("#"*50)
            print("Speech-only")
            print('Acc(avg): {:.4f}, Acc(ND): {:.4f}, Acc(D): {:.4f}\n F1(avg): {:.4f}, F1(ND): {:.4f}, F1(D): {:.4f}\n tn: {}, fp: {}\n fn: {}, tp: {}'.format(comp_scores_1[0],\
            comp_scores_1[1],comp_scores_1[2],comp_scores_1[3],comp_scores_1[4],comp_scores_1[5],comp_scores_1[6],comp_scores_1[7],comp_scores_1[8],comp_scores_1[9],))
            
            print("Text-only")
            print('Acc(avg): {:.4f}, Acc(ND): {:.4f}, Acc(D): {:.4f}\n F1(avg): {:.4f}, F1(ND): {:.4f}, F1(D): {:.4f}\n tn: {}, fp: {}\n fn: {}, tp: {}'.format(comp_scores_2[0],\
            comp_scores_2[1],comp_scores_2[2],comp_scores_2[3],comp_scores_2[4],comp_scores_2[5],comp_scores_2[6],comp_scores_2[7],comp_scores_2[8],comp_scores_2[9],))

            print("Speech+Text")
            print('Acc(avg): {:.4f}, Acc(ND): {:.4f}, Acc(D): {:.4f}\n F1(avg): {:.4f}, F1(ND): {:.4f}, F1(D): {:.4f}\n tn: {}, fp: {}\n fn: {}, tp: {}'.format(comp_scores_merged[0],\
            comp_scores_merged[1],comp_scores_merged[2],comp_scores_merged[3],comp_scores_merged[4],comp_scores_merged[5],comp_scores_merged[6],comp_scores_merged[7],comp_scores_merged[8],comp_scores_merged[9],))
        else:
            speech_only.append(comp_scores_1[3])
            text_only.append(comp_scores_2[3])
            merged.append(comp_scores_merged[3])

    if not verbose:
        print('{:.4f},{:.4f},{:.4f},{:.4f},{:.4f},{:.4f},{:.4f},{:.4f},{:.4f}'.format(\
            speech_only[0],text_only[0], merged[0]\
            ,speech_only[1],text_only[1], merged[1]\
            ,speech_only[2],text_only[2], merged[2]))

def get_results_prob(results_dict):
    '''
    This is a function to get the results of the experiments in terms of probabilities
    '''
    results_dict['prob'] = {}
    for key in results_dict['output'].keys():
        results_dict['prob'][key] = np.array(results_dict['output'][key]/results_dict['accum'][key])
    
    return results_dict

def merge_results(results_dict1, results_dict2):
    '''
    This is a function to merge the results of two experiments into one
    :param results_dict1: dictionary key: output, target, accum. For each of these keys is a corresponding dictionary where key: folder, value relates to the super key: output -> predictions from experiments, target -> corresponding label for the folder, accum -> the accumulated instances of each folder
    :param results_dict2: dictionary key: output, target, accum. For each of these keys is a corresponding dictionary where key: folder, value relates to the super key: output -> predictions from experiments, target -> corresponding label for the folder, accum -> the accumulated instances of each folder

    :return: results_dict: dictionary key: output, target, accum. For each of these keys is a corresponding dictionary where key: folder, value relates to the super key: output -> predictions from experiments, target -> corresponding label for the folder, accum -> the accumulated instances of each folder


    '''
    results_dict = {}
    results_dict['prob'] = {}
    results_dict['target'] = {}
    for key in results_dict1['output'].keys():
        results_dict['prob'][key] = np.concatenate([results_dict1['prob'][key], results_dict2['prob'][key]])
        results_dict['target'][key] = results_dict1['target'][key] 
    return results_dict

def get_results_dicts(exp_dir1, exp_dir2):

    with open(exp_dir1+'/accum_results_dict_total.pickle','rb') as f:
        results_dict1 = pickle.load(f)
    results_dict1 = get_results_prob(results_dict1)
    
    with open(exp_dir2+'/accum_results_dict_total.pickle','rb') as f:
        results_dict2 = pickle.load(f)
    results_dict2 = get_results_prob(results_dict2)

    merged_results_dict = merge_results(results_dict1, results_dict2)
    
    return results_dict1, results_dict2, merged_results_dict

        

In [3]:
# download folders and change path here. 
text_folder = 'text_cnn_lstm_feat_text_dim_9_batch_20_lr_0.0001_wd_0_lrf_2_alpha_0'
audio_folder = 'cnn_lstm_feature_compare16_delta_feat_dim_384_batch_20_lr_0.003_wdecay_0_lrf_2_alpha_4e-06'
results_dict1, results_dict2, merged_results_dict = get_results_dicts(audio_folder, text_folder)
print_results(results_dict1, results_dict2, merged_results_dict,verbose)