# Testing Accuracy with Batches

For each user that we test, group into batches with one sample per direction.

Test each sample individually, but average the prediction across the entire batch. 

In [1]:
import os
from random import randint
import numpy as np
import pandas as pd
from keras.layers import Dense, Activation, Input, CuDNNLSTM, Bidirectional, Dropout
from keras.models import Model, Sequential, load_model
from keras.preprocessing.sequence import pad_sequences
from keras import initializers, regularizers, constraints, optimizers, layers
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import OneHotEncoder, StandardScaler

Using TensorFlow backend.


### Data Preperation

In [2]:
# Return a list of usernames based on the file names in feature files. 
def get_usernames(train_files):
    usernames = []
    for f in train_files:
        ext = f[-4:]
        if(ext == '.csv'):
            username = f[:f.index('-')]
            usernames.append(username)
    return usernames


# Return a list of Pandas Dataframes from the csv feature files.  
def getdfs(path, files):
    dfs = []
    for f in files: 
        ext = f[-4:]
        if(ext == '.csv'):
            file = os.path.join(path, f)
            df = pd.read_csv(file)
            dfs.append(df)
    return dfs


# Create sequences out of a dataframe by splitting the sample number into a 2D array.
# For each sample in a given direction: create a list of the 10 features, and append that list to the sample
# Remove the sample number from the features as this is not important to the classifier. 
    '''
    Goal: [Sample 1: [[10 features], [10 features], [10 features]], Sample 2: [[10 features], [10 features], [10 features]]]
    '''
def create_sequences(df):

    samples = df['Sample'].unique()
    dirs = df['Direction'].unique()
    sequences = []
    for dir in dirs:
        dir_frame = df.loc[df['Direction'] == dir]
        for sample in samples:
            sample_frame = dir_frame.loc[dir_frame['Sample'] == sample]
            sf = sample_frame.drop(columns=['Sample']) # Remove the sample number before adding to the list
            values = list(sf.values.tolist())
            if len(values) > 0:
                sequences.append(values)
    return sequences


# encode each of the features with a one hot encoding for the direction variable
def encode(x):
    newx = []
    for sequence in x:
        new_seq = []
        for feature in sequence:
            new_seq.append(one_hot_encode(feature))
        newx.append(new_seq)
    return newx


# One hot encode the direction variable only.
# The direction is the first variable in the feature vec, so base the one hot encoding on this. 
# If the direction value is 0 (used for padding only) then use all zeros not an encoding
def one_hot_encode(feature_vec):
    feature_vec = list(feature_vec)
    zeros = [0, 0, 0, 0, 0, 0, 0, 0]
    n_labels = 8
    i = np.eye(n_labels)
    
    dir = feature_vec[0]
    if dir < 1:
        enc = zeros
    else:
        enc = i[int(feature_vec[0]) - 1]

    feature_vec.remove(feature_vec[0])

    feature_vec[0:0] = enc
    return feature_vec


def generate_batches(username, usernames, dfs):
    pass
    
# Create a test set similar to the create_test_set method
# This version uses ALL test data for every user, not just a subset. 
def create_full_test(username, usernames, dfs):
    # Create the positive examples. Set y to 1
    useridx = usernames.index(username)
    user_df = dfs[useridx]
    pos_batches = batchify(user_df, is_pos=True)
    ys = [1 for _ in pos_batches]
    
    # Create the negative examples
    neg_batches = []
    for user in usernames:
        if user != username:
            idx = usernames.index(user)
            df = dfs[idx]
            user_batches = batchify(df, is_pos=True)
            neg_batches += user_batches
    
    batches = pos_batches + neg_batches
    ys += [0 for _ in neg_batches]
    
    x = []
    for batch in batches:
        x.append(encode(pad_sequences(batch, maxlen=50, dtype='float32', value=[0 for x in range(10)])))
    X = np.asarray(x)
    Y = np.asarray(ys)               
    return X, Y

def create_test_set(username, usernames, dfs):
    
    # Create the positive examples. Set y to 1
    useridx = usernames.index(username)
    user_df = dfs[useridx]
    pos_batches = batchify(user_df, is_pos=True)
    ys = [1 for _ in pos_batches]
    
    # Create the negative examples
    neg_batches = []
    for user in usernames:
        if user != username:
            idx = usernames.index(user)
            df = dfs[idx]
            user_batches = batchify(df, is_pos=False)
            neg_batches += user_batches
    
    batches = pos_batches + neg_batches
    ys += [0 for _ in neg_batches]
    
    x = []
    for batch in batches:
        x.append(encode(pad_sequences(batch, maxlen=50, dtype='float32', value=[0 for x in range(10)])))
    X = np.asarray(x)
    Y = np.asarray(ys)               
    return X, Y
 


def batchify(dataframe, is_pos=True):
    dirs = [1,2,3,4,5,6,7,8]
    direction_seqs = [] 
    for i in dirs:
        dir_i = dataframe.loc[dataframe['Direction'] == i]
        seqs = create_sequences(dir_i)
        
        if not is_pos:
            # Randomly choose 4 samples for each direction for a user.
            seqs = [seqs[randint(0, len(seqs) -1)] for x in range(4)]
        direction_seqs.append(seqs)

    # Batch 1 sample per direction into a batch
    batches = []
    if not is_pos:
        num_batches = 4
    else:
        num_batches = find_num_batches(direction_seqs)
    
    for count in range(num_batches):
        batch = []
        for dir in direction_seqs:
            batch.append(dir[count])
        batches.append(batch)
    
    return batches

        
# Find the number of batches in a series. 
def find_num_batches(seqs):
    lens = [len(s) for s in seqs]
    return min(lens)




### Model Testing

In [3]:
# convert the probabilities to predictions on a threshold
def convert(p, threshold):
    return 1 if p >= threshold else 0


# Given a set of probabilities and true Y-Values, 
# Convert probabilities to predictions. 
# Determine accuracy and return dictionary with tracked metrics. 
def calc_accuracy_metrics(y_preds, y_actual):
    pairs = list(zip(y_preds, y_actual))
    
    tp = 0
    tn = 0
    fp = 0
    fn = 0
    for pair in pairs:
        if pair[0] == 1:
            if pair[1] == 1:
                tp += 1
            else:
                fp += 1
        else:
            if pair[1] == 0:
                tn += 1
            else:
                fn += 1
    
    try:
        far = fp / (fp + tn)
    except ZeroDivisionError:
        far = -1
    try:    
        frr = fn / (fn + tp)
    except ZeroDivisionError:
        frr = -1
    try:    
        precision = tp / (tp + fp)
    except ZeroDivisionError:
        precision = -1
    try:
        recall = tp / (tp + fn)
    except ZeroDivisionError:
        recall = -1
    try:    
        f1_score = (2 * precision * recall) / (precision + recall)
    except ZeroDivisionError:
        f1_score = -1
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    
    performance_results = {'far': far, 'frr': frr, 'precision': precision, 'recall': recall, 'f1': f1_score, 
                           'accuracy': accuracy}
    return performance_results


# Get the predicitons from the classifier. 
# Returns a list of predictions. 
def get_predictions(clf, test_x):
    ypreds = clf.predict(test_x, batch_size=10, verbose=False)
    return ypreds


def test_batches(clf, test_x, thresh):
    batch_preds = []
    for batch in test_x:
        y_preds = get_predictions(clf, batch)
        yhats = [convert(y, thresh) for y in y_preds]
        prediction = round(sum(yhats)/len(yhats))
        batch_preds.append(prediction)
    return batch_preds


def test_overall_acc(accuracy_dicts):
    far = 0
    frr = 0
    precision = 0
    recall = 0
    f1_score = 0
    accuracy = 0

    for d in accuracy_dicts:
        far += d.get('far')
        frr += d.get('frr')
        precision += d.get('precision')
        recall += d.get('recall')
        f1_score += d.get('f1')
        accuracy += d.get('accuracy')

    print('Overall Accuracy - Average of Individual Models:\n')
    print('FAR : {}'.format(far / len(usernames)))
    print('FRR : {}'.format(frr / len(usernames)))
    print('Precision : {}'.format(precision / len(usernames)))
    print('Recall : {}'.format(recall / len(usernames)))
    print('F1-Score : {}'.format(f1_score / len(usernames)))
    print('Accuracy : {}'.format(accuracy / len(usernames)))

### Create Test Sets for each user

In [4]:
test_path = os.path.join(os.getcwd(), 'RNN-Test-Aligned')
model_save_path = os.path.join(os.getcwd(), 'RNNv4-saved_models')
test_files = os.listdir(test_path)

usernames = get_usernames(test_files)
test_dfs = getdfs(test_path, test_files)

testing_xs = []
testing_ys = []
for user in usernames:
    xtest, ytest = create_test_set(user, usernames, test_dfs)
    testing_xs.append(xtest)
    testing_ys.append(ytest)

## Testing Overall Accuracy

In [5]:
eer_thresholds = { 
    'Conner1': 0.1, 
    'Cormac1': 0.4, 
    'David1': 0.2,
    'Ian2': 0.05,  
    'Jamison1': 0.28  
}

f_thresholds = { 
    'Conner1': 0.1, 
    'Cormac1': 0.4, 
    'David1': 0.2,
    'Ian2': 0.05,  
    'Jamison1': 0.28  
}

eer_dicts = []
f_dicts = []
    
for i in range(len(usernames)):
    username = usernames[i]
    eer_thresh = eer_thresholds[username]
    f_thresh = f_thresholds[username]
    test_x = testing_xs[i]
    test_y = testing_ys[i]
    filename = username + '_model.h5'
    path = os.path.join(os.getcwd(), 'RNNv4-saved_models')
    userfile = os.path.join(path, filename)
    clf = load_model(userfile)
    y_preds = test_batches(clf, test_x, eer_thresh)
    performance = calc_accuracy_metrics(y_preds, test_y)
    eer_dicts.append(performance)
    
    y_preds = test_batches(clf, test_x, f_thresh)
    performance = calc_accuracy_metrics(y_preds, test_y)
    f_dicts.append(performance)

print('EER Performance')
test_overall_acc(eer_dicts)
print()
print('F-Score Performance')
test_overall_acc(f_dicts)

EER Performance
Overall Accuracy - Average of Individual Models:

FAR : 0.0125
FRR : 0.0
Precision : 0.9800000000000001
Recall : 1.0
F1-Score : 0.9894736842105264
Accuracy : 0.992

F-Score Performance
Overall Accuracy - Average of Individual Models:

FAR : 0.0125
FRR : 0.0
Precision : 0.9800000000000001
Recall : 1.0
F1-Score : 0.9894736842105264
Accuracy : 0.992


## Test on all data, not just a subset

In [6]:
test_path = os.path.join(os.getcwd(), 'RNN-Test-Aligned')
model_save_path = os.path.join(os.getcwd(), 'RNNv4-saved_models')
test_files = os.listdir(test_path)

usernames = get_usernames(test_files)
test_dfs = getdfs(test_path, test_files)

testing_xs = []
testing_ys = []
for user in usernames:
    xtest, ytest = create_full_test(user, usernames, test_dfs)
    testing_xs.append(xtest)
    testing_ys.append(ytest)

eer_thresholds = { 
    'Conner1': 0.1, 
    'Cormac1': 0.4, 
    'David1': 0.2,
    'Ian2': 0.05,  
    'Jamison1': 0.28  
}

f_thresholds = { 
    'Conner1': 0.1, 
    'Cormac1': 0.4, 
    'David1': 0.2,
    'Ian2': 0.05,  
    'Jamison1': 0.28  
}

eer_dicts = []
f_dicts = []
    
for i in range(len(usernames)):
    username = usernames[i]
    eer_thresh = eer_thresholds[username]
    f_thresh = f_thresholds[username]
    test_x = testing_xs[i]
    test_y = testing_ys[i]
    filename = username + '_model.h5'
    path = os.path.join(os.getcwd(), 'RNNv4-saved_models')
    userfile = os.path.join(path, filename)
    clf = load_model(userfile)
    y_preds = test_batches(clf, test_x, eer_thresh)
    performance = calc_accuracy_metrics(y_preds, test_y)
    eer_dicts.append(performance)
    
    y_preds = test_batches(clf, test_x, f_thresh)
    performance = calc_accuracy_metrics(y_preds, test_y)
    f_dicts.append(performance)

print('EER Performance')
test_overall_acc(eer_dicts)
print()
print('F-Score Performance')
test_overall_acc(f_dicts)

EER Performance
Overall Accuracy - Average of Individual Models:

FAR : 0.0
FRR : 0.0
Precision : 1.0
Recall : 1.0
F1-Score : 1.0
Accuracy : 1.0

F-Score Performance
Overall Accuracy - Average of Individual Models:

FAR : 0.0
FRR : 0.0
Precision : 1.0
Recall : 1.0
F1-Score : 1.0
Accuracy : 1.0


## Findings

With 5 users we were able to produce very good results. The F-Score and EER minimizing thresholds used were the same thresholds found to be optimal in the trials with 15 users since we tested this using the same models trained with the 15 users.
In the case of these users however the thresholds were identical so this can be ignored. 

After testing on all the available test data, we achieved 100% accuracy with no errors being made.  