# Intialization

In [0]:
# Import libraries
import tensorflow as tf
import pandas as pd
import io
import numpy as np
import csv
import random
from google.colab import files 
from google.colab import drive
from zipfile import ZipFile
import os
import matplotlib.pyplot as plt
import glob
import re
import math
from tabulate import tabulate
import time
from scipy import signal
import itertools
import statistics
import warnings

In [0]:
drive.mount('/content/gdrive/') 

TARGET_FOLDER_NAME = "Thumb Swipe Data Collection"
PARENT_FOLDER_LOCATION = "/content/" + TARGET_FOLDER_NAME

MODEL_TO_TRAIN = 'periodogram' # 'periodogram', 'spectrogram', 'simple'

NUMBER_OF_EPOCHS = 25
BATCH_SIZE = 16
WINDOW_LENGTH = 64 #128

TIME_RUN = True
SHUFFLE_WITHIN_GROUPS = True
FORCE_BINARY = False

if (MODEL_TO_TRAIN == 'periodogram') or (MODEL_TO_TRAIN == 'spectrogram'):
    DATA_CHANNELS_TO_USE = 'channels' #'channels', 'all'
else:
    DATA_CHANNELS_TO_USE = 'all'

if FORCE_BINARY:
    NUMBER_OF_UNIQUE_GESTURES = 2
    UNIQUE_GESTURE_NAMES = ['other', 'thumb swipe']
else:
    NUMBER_OF_UNIQUE_GESTURES = 4
    UNIQUE_GESTURE_NAMES = ['none', 'thumb out', 'index up', 'thumb swipe']
    
FOLD_SPLIT_METHOD = 'one rep out'
PRESUMED_SAMPLING_FREQUENCY = 600
DATA_COLUMN_NAMES = ['timestamp', 'c0', 'c1', 'x_quat', 'y_quat', 'z_quat', 'w_quat', 'label', 'rep_number']

RANDOM_SEED_INITIALIZATION = 777

random.seed(RANDOM_SEED_INITIALIZATION)
np.random.seed(RANDOM_SEED_INITIALIZATION)

try: data_unzipped
except NameError: 
    !unzip -uq "/content/gdrive/My Drive/Thumb Swipe Data Collection.zip" -d "/content"
    data_unzipped = None

# Main Definition

In [0]:
def main(model_choice = None, straight_parameters = None, DATA_CHANNELS_TO_USE = DATA_CHANNELS_TO_USE):
    if TIME_RUN:
        total_run_start = time.time()
    
    df = form_total_df()
    idx_array, fold_array, unique_subject_list, unique_repetition_list, unique_gesture_list = extract_task_indices(df)
    
    window_length = WINDOW_LENGTH
    window_overlap = .5
    
    if DATA_CHANNELS_TO_USE == 'channels':
        data_channels = ['c0', 'c1']
    elif DATA_CHANNELS_TO_USE == 'quat':
        data_channels = ['x_quat', 'y_quat', 'z_quat', 'w_quat']
    elif DATA_CHANNELS_TO_USE == 'all':
        data_channels = ['c0', 'c1', 'x_quat', 'y_quat', 'z_quat', 'w_quat']
    else:
         raise Exception("improper DATA_CHANNELS_TO_USE entered")   
    
    input_dim = (window_length, len(data_channels))
    output_dim = NUMBER_OF_UNIQUE_GESTURES
    
    if FORCE_BINARY:
        planned_loss_function = "categorical_hinge"
    else:
        planned_loss_function = 'categorical_crossentropy'
    
    if FOLD_SPLIT_METHOD == 'one fold out':
        test_fold_idx = random.choice(range(0, len(fold_array)))
        test_fold_target = fold_array[test_fold_idx]
    elif FOLD_SPLIT_METHOD == 'one rep out':
        test_fold_target = random.choice(range(0, len(unique_repetition_list)))
    elif FOLD_SPLIT_METHOD == 'one user out':
        test_fold_target = random.choice(range(0, len(unique_subject_list)))
    else: 
        raise Exception("FOLD_SPLIT_METHOD chosen is incompatible with this setup")
    
    test_data_tensor = np.empty((0, window_length, len(data_channels)))
    test_info_array = np.empty((0, 3))
    test_fold, remaining_fold = split_fold(fold_array, test_fold_target, split_method = FOLD_SPLIT_METHOD)
        
    for fold in test_fold:
        for idx_pair in fold_to_idx(fold, idx_array):
            interim_data_tensor, interim_info = window_data(idx_to_df(idx_pair, df), data_channels = data_channels,
                                                            window_length = window_length, 
                                                            window_overlap = window_overlap)
            
            test_data_tensor = np.append(test_data_tensor, interim_data_tensor, axis = 0)
            test_info_array = np.append(test_info_array, interim_info, axis = 0) 
    test_labels = test_info_array[:, 0]
    test_tuple = (test_data_tensor, encode_labels(test_labels))

    def cross_test(parameter_loading = None, model_choice = None):

        if FOLD_SPLIT_METHOD == 'one fold out':
            cross_fold_iterations = remaining_fold
        elif FOLD_SPLIT_METHOD == 'one rep out':
            cross_fold_iterations = list(range(0, len(unique_repetition_list)))
            cross_fold_iterations.remove(test_fold_target)
        elif FOLD_SPLIT_METHOD == 'one user out':
            cross_fold_iterations = list(range(0, len(unique_subject_list)))
            cross_fold_iterations.remove(test_fold_target)

        cross_fold_loss = []  

        for leave_out_fold in cross_fold_iterations:
            if TIME_RUN:
                leave_out_fold_start = time.time()

            train_data_tensor = validation_data_tensor = np.empty((0, window_length, len(data_channels)))
            train_info_array = validation_info_array = np.empty((0, 3))
            validation_fold, train_fold = split_fold(remaining_fold, leave_out_fold, split_method = FOLD_SPLIT_METHOD)

            for fold in train_fold:
                for idx_pair in fold_to_idx(fold, idx_array):
                    interim_data_tensor, interim_info = window_data(idx_to_df(idx_pair, df), 
                                                                    data_channels = data_channels, 
                                                                    window_length = window_length,
                                                                    window_overlap = window_overlap)
                    train_data_tensor = np.append(train_data_tensor, interim_data_tensor, axis = 0)
                    train_info_array = np.append(train_info_array, interim_info, axis = 0)

            train_labels = train_info_array[:, 0]

            if SHUFFLE_WITHIN_GROUPS:
                train_data_tensor_ordered = train_data_tensor
                train_labels_ordered = train_labels
                train_data_tensor, train_labels = concurrent_shuffle(train_data_tensor, train_labels)

            train_tuple = (train_data_tensor, encode_labels(train_labels))

            for fold in validation_fold:
                for idx_pair in fold_to_idx(fold, idx_array):
                    interim_data_tensor, interim_info = window_data(idx_to_df(idx_pair, df), 
                                                                    data_channels = data_channels,
                                                                    window_length = window_length,
                                                                    window_overlap = window_overlap)
                    validation_data_tensor = np.append(validation_data_tensor, interim_data_tensor, axis = 0)
                    validation_info_array = np.append(validation_info_array, interim_info, axis = 0) 
            validation_labels = validation_info_array[:, 0]

            if SHUFFLE_WITHIN_GROUPS:
                validation_data_tensor_ordered = validation_data_tensor
                validation_labels_ordered = validation_labels
                validation_data_tensor, validation_labels = concurrent_shuffle(validation_data_tensor, 
                                                                               validation_labels)

            validation_tuple = (validation_data_tensor, encode_labels(validation_labels))

            if parameter_loading is None:
                if model_choice is None:            
                    ml = pure_periodogram_net() 
                elif model_choice == 'secret model':
                    ml = secret_net(input_shape = input_dim)
                elif model_choice == 'periodogram':
                    ml = pure_periodogram_net()
                elif model_choice == 'spectrogram':
                    ml = spectrogram_2d_conv_net()
                elif model_choice == 'simple':
                    ml = simple_net(input_shape = input_dim)
                else:
                    raise Exception("improper model_choice input")
            else:
                if model_choice is None:            
                    ml = pure_periodogram_net(parameters = parameter_loading) 
                elif model_choice == 'secret model':
                    ml = secret_net(input_shape = input_dim, parameters = parameter_loading)
                elif model_choice == 'periodogram':
                    ml = pure_periodogram_net(parameters = parameter_loading)
                elif model_choice == 'spectrogram':
                    ml = spectrogram_2d_conv_net(parameters = parameter_loading)
                elif model_choice == 'simple':
                    ml = simple_net(input_shape = input_dim, parameters = parameter_loading)
                else:
                    raise Exception("improper model_choice input")

            train_results = ml.train(train_tuple = train_tuple, optimizer = tf.train.AdamOptimizer(),
                                     show_progress = 0, verbose = 0, loss = planned_loss_function)
            validation_results = ml.evaluate(test_tuple = validation_tuple, verbose = 0)

            cross_fold_loss.append(validation_results[0])
            
        return np.mean(cross_fold_loss)
    
    if straight_parameters is None:
        min_loss = float("inf")
        if TIME_RUN:
            hyperparameter_tuning_start = time.time()

        if model_choice == 'secret model':
            parameter_names = ['int_conv_filt', 'ext_conv_filt', 'dense_nodes']
            parameter_options = ((8, 16), (4, 8), (16, 32))
            assert len(parameter_names) == len(parameter_options)
            num_loops = 1
            for i, option_name in enumerate(parameter_names):
                num_loops = num_loops * len(parameter_options[i])
            j = 0
            for int_conv_filt in parameter_options[0]:
                for ext_conv_filt in parameter_options[1]:
                    for dense_nodes in parameter_options[2]:
                        interim_parameters = (int_conv_filt, ext_conv_filt, dense_nodes)
                        interim_loss = cross_test(parameter_loading = interim_parameters,
                                                  model_choice = model_choice)
                        j = j + 1
                        print("Completed loop {} of {}\nElapsed time = {} minutes".format(j, num_loops, 
                                                                                          round((time.time() 
                                                                                                - total_run_start) 
                                                                                                / 60, 
                                                                                                2)))
                        print("Loss = {}\nParameters = {}".format(round(interim_loss, 4), interim_parameters))
                        if interim_loss < min_loss:
                            min_loss = interim_loss
                            best_parameters = interim_parameters
                            print("\n\tNew best model for {}:\n\tLoss = {}\n\tParameters = {}\n".format(model_choice,
                                                                                                    round(min_loss, 4),
                                                                                                    best_parameters))
        
        elif model_choice == 'periodogram':
            parameter_names = ['num_conv_filt_1', 'num_conv_filt_2', 'dense_nodes']
            parameter_options = ((8, 16), (8, 16), (16, 32))
            assert len(parameter_names) == len(parameter_options)
            num_loops = 1
            for i, option_name in enumerate(parameter_names):
                num_loops = num_loops * len(parameter_options[i])
            j = 0
            for int_conv_filt in parameter_options[0]:
                for ext_conv_filt in parameter_options[1]:
                    for dense_nodes in parameter_options[2]:
                        interim_parameters = (int_conv_filt, ext_conv_filt, dense_nodes)
                        interim_loss = cross_test(parameter_loading = interim_parameters,
                                                  model_choice = model_choice)
                        j = j + 1
                        print("Completed loop {} of {}\nElapsed time = {} minutes".format(j, num_loops, 
                                                                                          round((time.time() 
                                                                                                - total_run_start) 
                                                                                                / 60, 
                                                                                                2)))
                        print("Loss = {}\nParameters = {}".format(round(interim_loss, 4), interim_parameters))
                        if interim_loss < min_loss:
                            min_loss = interim_loss
                            best_parameters = interim_parameters
                            print("\n\tNew best model for {}:\n\tLoss = {}\n\tParameters = {}\n".format(model_choice,
                                                                                                    round(min_loss, 4),
                                                                                                    best_parameters))
        elif model_choice == 'spectrogram':
            parameter_names = ['num_conv_filt_1', 'num_conv_filt_2', 'dense_nodes']
            parameter_options = ((8, 16), (4, 8), (16, 32))
            assert len(parameter_names) == len(parameter_options)
            num_loops = 1
            for i, option_name in enumerate(parameter_names):
                num_loops = num_loops * len(parameter_options[i])
            j = 0
            for int_conv_filt in parameter_options[0]:
                for ext_conv_filt in parameter_options[1]:
                    for dense_nodes in parameter_options[2]:
                        interim_parameters = (int_conv_filt, ext_conv_filt, dense_nodes)
                        interim_loss = cross_test(parameter_loading = interim_parameters,
                                                  model_choice = model_choice)
                        j = j + 1
                        print("Completed loop {} of {}\nElapsed time = {} minutes".format(j, num_loops, 
                                                                                          round((time.time() 
                                                                                                - total_run_start) 
                                                                                                / 60, 
                                                                                                2)))
                        print("Loss = {}\nParameters = {}".format(round(interim_loss, 4), interim_parameters))
                        if interim_loss < min_loss:
                            min_loss = interim_loss
                            best_parameters = interim_parameters
                            print("\n\tNew best model for {}:\n\tLoss = {}\n\tParameters = {}\n".format(model_choice,
                                                                                                    round(min_loss, 4),
                                                                                                    best_parameters))                   
        elif model_choice == 'simple':
            parameter_names = ['dense_nodes_1', 'dense_nodes_2']
            parameter_options = ((32, 64), (32, 64))
            assert len(parameter_names) == len(parameter_options)
            num_loops = 1
            for i, option_name in enumerate(parameter_names):
                num_loops = num_loops * len(parameter_options[i])
            j = 0
            for int_conv_filt in parameter_options[0]:
                for ext_conv_filt in parameter_options[1]:
                    interim_parameters = (int_conv_filt, ext_conv_filt)
                    interim_loss = cross_test(parameter_loading = interim_parameters, model_choice = model_choice)
                    j = j + 1
                    print("Completed loop {} of {}\nElapsed time = {} minutes".format(j, num_loops, 
                                                                                      round((time.time() 
                                                                                             - total_run_start) 
                                                                                            / 60, 
                                                                                            2)))
                    print("Loss = {}\nParameters = {}".format(round(interim_loss, 4), interim_parameters))
                    if interim_loss < min_loss:
                        min_loss = interim_loss
                        best_parameters = interim_parameters
                        print("\n\tNew best model for {}:\n\tLoss = {}\n\tParameters = {}\n".format(model_choice,
                                                                                                    round(min_loss, 4),
                                                                                                    best_parameters))
        
        if TIME_RUN:
            hyperparameter_tuning_time = round(time.time() - hyperparameter_tuning_start, 3)
    
    else:
        best_parameters = straight_parameters
            
    train_data_tensor = np.empty((0, window_length, len(data_channels)))
    train_info_array = np.empty((0, 3))
    for fold in remaining_fold:
        for idx_pair in fold_to_idx(fold, idx_array):
            interim_data_tensor, interim_info = window_data(idx_to_df(idx_pair, df), 
                                                                    data_channels = data_channels,
                                                                    window_length = window_length, 
                                                                    window_overlap = window_overlap)
            train_data_tensor = np.append(train_data_tensor, interim_data_tensor, axis = 0)
            train_info_array = np.append(train_info_array, interim_info, axis = 0)                                      
    train_labels = train_info_array[:, 0]
        
    if SHUFFLE_WITHIN_GROUPS:
        train_data_tensor_ordered = train_data_tensor
        train_labels_ordered = train_labels
        train_data_tensor, train_labels = concurrent_shuffle(train_data_tensor, train_labels)
            
    train_tuple = (train_data_tensor, encode_labels(train_labels))
        
    if model_choice == 'secret model':
        ml = secret_net(input_shape = input_dim, parameters = best_parameters)
    elif model_choice == 'periodogram':
        ml = pure_periodogram_net(parameters = best_parameters)
    elif model_choice == 'spectrogram':
        ml = spectrogram_2d_conv_net(parameters = best_parameters)
    elif model_choice == 'simple':
        ml = simple_net(input_shape = input_dim, parameters = best_parameters)
    else:
        raise Exception("improper model_choice input")
        
    if TIME_RUN:
        training_time_start = time.time()
            
    final_train_results = ml.train(train_tuple = train_tuple, optimizer = tf.train.AdamOptimizer(), 
                                   loss = planned_loss_function)
    final_train_f1 = calculate_f1(train_labels, ml.apply(train_data_tensor, verbose = 0))
    
    if TIME_RUN:
        training_time = round(time.time() - training_time_start, 3)
    
    test_results =  ml.evaluate(test_tuple = test_tuple, verbose = 0)
    predicted_test_labels = ml.apply(test_data_tensor, verbose = 0)
    test_f1 = calculate_f1(test_labels, predicted_test_labels)
    
    if TIME_RUN:
        final_run_time = time.time() - total_run_start
    
    plot_confusion_matrix(test_labels, predicted_test_labels, normalize = False, 
                          title = "Confusion Matrix for {} model".format(ml.model_name))
    
    print("\nTesting Confusion Matrix:\n{}\n".format(plain_confusion_matrix(test_labels, 
                                                                            predicted_test_labels, 
                                                                            normalize = False)))
    
    if TIME_RUN:
        print("total runtime = {} minutes".format(round(final_run_time / 60, 2)))
        if straight_parameters is None:
            print("tuning time = {} minutes".format(round(hyperparameter_tuning_time / 60, 2)))
        print("training time = {} minutes".format(round(training_time / 60 , 2)))
    num_loops = 1 
    if straight_parameters is None:
        for i, option_name in enumerate(parameter_names):
            print("Possible hyperparameters for {} : {}\nSelected {}\n".format(option_name, 
                                                                           parameter_options[i], best_parameters[i]))
            num_loops = num_loops * len(parameter_options[i])
            
    print("number of epochs = {}".format(NUMBER_OF_EPOCHS))
    print("batch size = {}".format(BATCH_SIZE))
    
    print("total number of hyperparameter combinations tested = {}".format(num_loops))

    print('\n',tabulate([['data_set','loss', 'accuracy', 'f1'],
                         np.concatenate((['training'], np.around(np.array(final_train_results), 3),
                                         [round(final_train_f1, 3)])),
                         np.concatenate((['testing'], np.around(np.array(test_results), 3),
                                         [round(test_f1, 3)]))],
                           headers = "firstrow"), '\n')

    print("Window duration = {} seconds".format(round(window_length / PRESUMED_SAMPLING_FREQUENCY, 3)))
    print("Window_length = {}".format(window_length))
    print("Window_overlap = {}".format(window_overlap))
    print("data_channels used = {}".format(data_channels))
    print("Number of model parameters = {}".format(ml.model.count_params()))  
    print("Model_name = {}".format(ml.model_name))
    print("Model_notes :  {}".format(ml.notes))
    
    print("best_parameter values : {}".format(ml.parameters))
    
    print("\nTraining set statistics:")
    generate_extra_accuracy_metrics(train_labels, ml.apply(train_data_tensor, verbose = 0))
    
    print("\nTesting set statistics:")
    generate_extra_accuracy_metrics(test_labels, predicted_test_labels)

# ML Functions

## ML flow definition

In [0]:
class machine_learning_flow:
    def __init__(self, model_assembly, input_shape, output_shape, model_name, parameters = None, 
                 preprocess = lambda x : x, verbose = False, notes = None):
        self.model_name = model_name
        self.preprocess = preprocess
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.notes = notes
        self.parameters = parameters
        
        if not callable(input_shape):
            self.model = model_assembly(input_shape, output_shape, parameters = parameters)
            if verbose:
                self.model.summary()
            self.model_assembled = True
        else:
            self.model = model_assembly
            self.model_assembled = False
        
    def apply(self, x, verbose = 1, postprocessing = False):
        
        def predict_classes(input_model, X, verbose = 1):
            proba = input_model.predict(X, batch_size= BATCH_SIZE, verbose = min(1 ,verbose))
            return proba.argmax(axis=-1)
        
        if postprocessing:
            return predict_classes(self.model, x, verbose = verbose)
        else:
            return predict_classes(self.model, self.preprocess(x), verbose = verbose)
    
    def train(self, train_tuple, validation_tuple = None, verbose = 2, show_progress = True, 
              optimizer = tf.keras.optimizers.Adam(lr = 0.001), loss = 'categorical_crossentropy', 
              number_of_epochs = NUMBER_OF_EPOCHS, metrics = ['accuracy']):
        
        if self.model_assembled: #if input shape is not uniquely dependent on the input itself
            if verbose > 0:
                self.model.summary()
        else:
            interim_input_shape = self.input_shape(train_tuple[0])
            self.model = self.model(interim_input_shape, self.output_shape, parameters = self.parameters)
            if verbose > 0:
                self.model.summary()
            self.model_assembled = True
            
        if validation_tuple is not None:
            has_validation = True
        else:
            has_validation = False
        
        
        class_weight = None
        
        if show_progress:
            print("Model being trained:", self.model_name)
        
        
        self.model.compile(optimizer = optimizer, loss = loss, metrics = metrics) 

        if validation_tuple is not None:
            num_validation_steps = math.ceil(validation_tuple[0].shape[0] / BATCH_SIZE)
            
            history = self.model.fit(self.preprocess(train_tuple[0]), 
                                     train_tuple[1], 
                                     epochs = number_of_epochs, 
                                     batch_size = BATCH_SIZE, 
                                     verbose = verbose,
                                     validation_data = (self.preprocess(validation_tuple[0]), 
                                                        validation_tuple[1]), 
                                     validation_steps = num_validation_steps,
                                     class_weight = class_weight) 
            results = (self.model.evaluate(self.preprocess(train_tuple[0]), train_tuple[1], verbose = 0),
                       self.model.evaluate(self.preprocess(validation_tuple[0]), validation_tuple[1], verbose = 0))
        else:
            
            history = self.model.fit(self.preprocess(train_tuple[0]),
                                     train_tuple[1], 
                                     epochs = NUMBER_OF_EPOCHS, 
                                     batch_size = BATCH_SIZE,
                                     verbose = verbose,
                                     class_weight = class_weight)  
            
            results = self.model.evaluate(self.preprocess(train_tuple[0]), train_tuple[1], verbose = 0)
        if verbose:
            loss_plot(history, has_validation = has_validation)
        
            print('\nResults of model testing:')
        return results
    
    def evaluate(self, test_tuple, verbose = 1):
        return self.model.evaluate(self.preprocess(test_tuple[0]), test_tuple[1], verbose = verbose)
    
    def force_input_shape(self, new_input_shape):
        assert self.model_assembled == False, "Forcing input shape on an already assembled model"
        self.input_shape = new_input_shape
        self.model = model_assembly(self.input_shape, self.output_shape, parameters = self.parameters)
        self.model_assembled = True

## 2D_conv spectrogram nets

### Basic spectrogram_2d_conv_net

In [0]:
def spectrogram_2d_conv_net(input_shape = None, output_shape = NUMBER_OF_UNIQUE_GESTURES, nperseg = 16, 
                            parameters = (16, 8, 32), notes = ''):
    
    input_shape_lambda = lambda incoming_data: input_shape_for_conv_spectrogram(incoming_data, nperseg = nperseg)
    preprocess_lambda = lambda incoming_data: preprocess_input_for_spectrogram(incoming_data, nperseg = nperseg)
    
    return machine_learning_flow(spectrogram_2d_conv_net_architecture, 
                                 input_shape = input_shape_lambda, 
                                 output_shape = output_shape,
                                 model_name = 'spectrogram_2d_conv_net', 
                                 preprocess = preprocess_lambda,
                                 parameters = parameters,
                                 notes = ["nperseg = {}".format(nperseg)] + [notes])

def spectrogram_2d_conv_net_architecture(in_dim, out_dim, parameters):
    assert len(parameters) == 3, "improper number of parameters entered into spectrogram_2d_conv_net_architecture"
    
    (first_conv_layer_filters, second_conv_layer_filters, number_of_dense_nodes) = parameters
    
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.BatchNormalization(input_shape = in_dim, axis = -1))
    
    model.add(tf.keras.layers.Conv2D(first_conv_layer_filters, kernel_size = 2))
    model.add(tf.keras.layers.Conv2D(second_conv_layer_filters, kernel_size = 2))
    model.add(tf.keras.layers.MaxPooling2D())
    
    model.add(tf.keras.layers.Flatten())
    
    model.add(tf.keras.layers.Dense(number_of_dense_nodes, activation='relu'))
    
    model.add(tf.keras.layers.Dense(out_dim, activation = 'softmax'))
  
    return model

### Setup functions

In [0]:
def input_shape_for_conv_spectrogram(x, nperseg = 16):
    return preprocess_input_for_spectrogram(x[0:1], nperseg = nperseg).shape[1:]
 
def preprocess_input_for_spectrogram(x, nperseg = 16):
    def spectrogram_transform(y):
        y = signal.spectrogram(y, fs = PRESUMED_SAMPLING_FREQUENCY, nperseg = nperseg, window = 'hamming', axis = 0)[-1]
        return np.expand_dims(np.transpose(y, (0,2,1)), axis = 0)
    
    output_spectrogram = spectrogram_transform(x[0])
    
    for i in range(1, x.shape[0]):
        output_spectrogram = np.append(output_spectrogram, spectrogram_transform(x[i]), axis = 0)
    
    return output_spectrogram

## Periodogram nets

### Pure Periodogram net

In [0]:
def pure_periodogram_net(input_shape = None, output_shape = NUMBER_OF_UNIQUE_GESTURES, 
                         parameters = (16, 16, 16), notes = ''):
    
    return machine_learning_flow(pure_periodogram_net_architecture, 
                                 input_shape = input_shape_for_pure_periodogram, 
                                 output_shape = output_shape,
                                 model_name = 'pure_periodogram_net', 
                                 preprocess = transform_to_pure_periodogram,
                                 parameters = parameters,
                                 notes = [notes])

def pure_periodogram_net_architecture(in_dim, out_dim, parameters = (16, 16, 16)):
    assert len(parameters) == 3, "improper number of parameters entered into pure_periodogram_net_architecture"
    
    channel_setup = 'channels_last'
    
    number_of_internal_conv_filters = parameters[0]
    number_of_external_conv_filters = parameters[1]
    number_of_dense_nodes = parameters[2]
    
    incoming_data = tf.keras.layers.Input(shape = in_dim, name = 'main_input') 
    
    if in_dim[-1] == 2:
        has_quat = False
    else:
        has_quat = True
    transformed_data = tf.keras.layers.BatchNormalization(axis = -1, name = 'input_batch_normalization_layer')(incoming_data)
    #transformed_data = tf.keras.layers.Lambda(lambda z: transform_to_pure_periodogram(z[:, :, 0:2]))(incoming_data)
    
    def channel_metrics(x, channel, channel_label = None, channel_setup = "channels_last"):
        y = tf.keras.layers.Lambda(lambda z: tf.expand_dims(z[:, :, channel], -1),
                                   name = 'metric_lambda_extraction_' + channel_label)(x)
        y_mean = tf.keras.layers.Lambda(lambda z: tf.math.reduce_mean(z, axis = 1, keepdims = True), 
                                        name = 'metric_lambda_mean_' + channel_label)(y)
        y_std = tf.keras.layers.Lambda(lambda z: tf.math.reduce_std(z, axis = 1, keepdims = True), 
                                       name = 'metric_lambda_std_' + channel_label)(y)
        y_max = tf.keras.layers.Lambda(lambda z: tf.math.reduce_max(z, axis = 1, keepdims = True), 
                                       name = 'metric_lambda_max_' + channel_label)(y)
        y_min = tf.keras.layers.Lambda(lambda z: tf.math.reduce_min(z, axis = 1, keepdims = True), 
                                       name = 'metric_lambda_min_' + channel_label)(y)
        
        y = tf.keras.layers.concatenate([y_mean, y_std, y_max, y_min], axis = 1)
        
        return y
    
    if has_quat:
        x_quat_metric_isolated_model = tf.keras.models.Model(inputs = incoming_data, 
                                                    outputs = channel_metrics(transformed_data, 2,
                                                                              channel_label = 'x_quat'))
        y_quat_metric_isolated_model = tf.keras.models.Model(inputs = incoming_data, 
                                                    outputs = channel_metrics(transformed_data, 3, 
                                                                              channel_label = 'y_quat'))
        z_quat_metric_isolated_model = tf.keras.models.Model(inputs = incoming_data, 
                                                    outputs = channel_metrics(transformed_data, 4, 
                                                                              channel_label = 'z_quat'))
        w_quat_metric_isolated_model = tf.keras.models.Model(inputs = incoming_data, 
                                                    outputs = channel_metrics(transformed_data, 5, 
                                                                              channel_label = 'w_quat'))
        
        metric_output = tf.keras.layers.concatenate([x_quat_metric_isolated_model.output, 
                                                     y_quat_metric_isolated_model.output, 
                                                     z_quat_metric_isolated_model.output,
                                                     w_quat_metric_isolated_model.output], 
                                                    axis = -1)
        
    
    combined_output = tf.keras.layers.Conv1D(number_of_internal_conv_filters, kernel_size = 3, strides = 1, padding = 'same', 
                                   activation = 'relu', data_format = channel_setup)(transformed_data)
    
    combined_output = tf.keras.layers.MaxPooling1D(pool_size = 2, strides = None, padding = 'valid', 
                                           data_format = channel_setup)(combined_output)
    
    combined_output = tf.keras.layers.Conv1D(number_of_external_conv_filters, kernel_size = 2, strides = 1,
                                             activation = 'relu', padding = 'same', 
                                             data_format = channel_setup,)(combined_output)
    
    combined_output = tf.keras.layers.Conv1D(number_of_external_conv_filters, kernel_size = 2, strides = 1,
                                             activation = 'relu', padding = 'same', 
                                             data_format = channel_setup,)(combined_output)
        
    combined_output = tf.keras.layers.MaxPooling1D(pool_size = 2, strides = None, padding = 'valid', 
                                           data_format = channel_setup)(combined_output)
    
    combined_output = tf.keras.layers.Flatten()(combined_output)
    
    if has_quat:
        metric_output = tf.keras.layers.BatchNormalization(axis = -1, 
                                                           name = 'metric_batch_normalization_layer')(metric_output) 
        metric_output = tf.keras.layers.Flatten()(metric_output)
        combined_output = tf.keras.layers.concatenate([combined_output, metric_output], axis = -1)
    
    combined_output = tf.keras.layers.Dense(number_of_dense_nodes, activation = 'relu')(combined_output)
    combined_output = tf.keras.layers.BatchNormalization()(combined_output)
    
    combined_output = tf.keras.layers.Dense(out_dim, activation='softmax')(combined_output)
        
    model = tf.keras.models.Model(inputs = incoming_data, outputs = combined_output)
  
    return model

### Setup Functions

In [0]:
def transform_to_pure_periodogram(x, times = None, method = 'normal'):
    if method == 'normal':
        _, Pxx = signal.periodogram(x, fs = PRESUMED_SAMPLING_FREQUENCY, window = 'hamming', 
                                    scaling = 'density', axis = 1)
        return Pxx
    elif method == 'lombscargle':
        return None

def input_shape_for_pure_periodogram(x, times = None, method = 'normal'):
    y = transform_to_pure_periodogram(x[0:2, :, 0:2], times = times, method = method)
    return y.shape[1:]

## Simple nets 

In [0]:
def simple_net(input_shape, output_shape = NUMBER_OF_UNIQUE_GESTURES, parameters = (32, 32)):
    return machine_learning_flow(simple_net_architecture, 
                                 output_shape = output_shape,
                                 input_shape = input_shape, 
                                 model_name = 'super simple dense net',
                                 parameters = parameters)

def simple_net_architecture(in_dim, out_dim, parameters = (32, 32)):
    assert len(parameters) == 2, "improper number of parameters entered into simple_net_architecture"
    
    (number_of_dense_nodes_1, number_of_dense_nodes_2) = parameters
    model = tf.keras.models.Sequential()
    
    model.add(tf.keras.layers.BatchNormalization(input_shape = in_dim))
    model.add(tf.keras.layers.Flatten())
    
    model.add(tf.keras.layers.Dense(number_of_dense_nodes_1, activation='relu'))
    
    
    model.add(tf.keras.layers.Dense(number_of_dense_nodes_2, activation='relu'))
    
    
    
    model.add(tf.keras.layers.Dense(out_dim, activation = 'softmax'))
        
    return model

## Secret Net

# Other Functions

## Data Loading Functions

In [0]:
def form_total_df():
    file_path = glob.glob(PARENT_FOLDER_LOCATION + "/***.csv")
    file_path.sort()

    total_df = pd.DataFrame(columns = DATA_COLUMN_NAMES + ['subject_id'])
    for file in file_path:
        subject_id = int(file.split("/S")[-1].split("_")[0])
        holding_df = pd.read_csv(file, header = None)
        holding_df.columns = DATA_COLUMN_NAMES
        holding_df.sort_values(by = ['timestamp'], inplace = True)
        holding_df['subject_id'] = [subject_id] * len(holding_df)
        total_df = total_df.append(holding_df, ignore_index = True)
    
    if FORCE_BINARY:
        total_df.replace({'label': [1, 2]}, 0, inplace = True)
        total_df.replace({'label': 3}, 1, inplace = True)
        
    return total_df

def form_dict_df():
    file_path = glob.glob(PARENT_FOLDER_LOCATION + "/***.csv")
    file_path.sort()
    
    data_dict = {}
    for file in file_path:
        subject_id = int(file.split("/S")[-1].split("_")[0])
        data_dict[subject_id] = pd.read_csv(file, header = None)
        data_dict[subject_id].columns = DATA_COLUMN_NAMES  
        data_dict[subject_id].sort_values(by = ['timestamp'], inplace = True)
        
    return data_dict

## Data Extraction Functions

In [50]:
def extract_task_indices(df):
    unique_subject_list = df['subject_id'].unique()
    unique_gesture_list = df['label'].unique()
    unique_repetition_list = df['rep_number'].unique()
    
    idx_array = np.empty((len(unique_subject_list) 
                                     * len(unique_repetition_list) 
                                     * len(unique_gesture_list), 
                                     2 + 3),
                                   dtype = np.uint32)
    fold_array = []
    
    m = 0
    for i, subject in enumerate(unique_subject_list):
        subject_holding_df =  df.loc[df['subject_id'] == subject]
        for j, repetition in enumerate(unique_repetition_list):
            repetition_holding_df = subject_holding_df.loc[subject_holding_df['rep_number'] == repetition]
            fold_array.append((i,j))     
            for k, gesture in enumerate(unique_gesture_list):
                transient_idx_array = repetition_holding_df.loc[repetition_holding_df['label'] == gesture].index
                
                idx_array[m] = np.array([transient_idx_array[0], 
                                         transient_idx_array[-1] + 1, # add 1 for the purposes of slicing
                                         i,j,k])
                m = m + 1
                
    return idx_array, fold_array, unique_subject_list, unique_repetition_list, unique_gesture_list

def split_fold(fold_array, extract_fold, split_method = 'one fold out'): 
    if split_method == 'one fold out':
        if (type(extract_fold) is not tuple):
            raise Exception
        extract_fold = [extract_fold]
        pulled_fold = extract_fold
        trimmed_fold = [x for x in fold_array if x not in extract_fold]
        
    elif split_method == 'one user out':    
        if (type(extract_fold) is not int):
            raise Exception 
        
        extract_fold = [x for x in fold_array if x[0] == extract_fold]
        pulled_fold = [x for x in fold_array if x in extract_fold]
        trimmed_fold = [x for x in fold_array if x not in extract_fold]
        
    elif split_method == 'one rep out':    
        if (type(extract_fold) is not int):
            raise Exception 
        
        extract_fold = [x for x in fold_array if x[1] == extract_fold]
        pulled_fold = [x for x in fold_array if x in extract_fold]
        trimmed_fold = [x for x in fold_array if x not in extract_fold]
        
    elif split_method == 'specific':
        assert (type(extract_fold) is list) and (type(extract_fold[0]) is tuple)
            
        pulled_fold = [x for x in fold_array if x in extract_fold]
        trimmed_fold = [x for x in fold_array if x not in extract_fold]
        
    else:
        raise Exception("improper split_method name")
    
    return pulled_fold, trimmed_fold


def fold_to_idx(fold, idx_array):
    assert type(fold) is tuple
    sub_idx = idx_array[np.all(idx_array[:,2:4] == fold, axis = -1)][:, 0:2]
    return sub_idx

def idx_to_df(idx_pair, df):
    sub_df = df.iloc[idx_pair[0]: idx_pair[1]]
    return sub_df
    
def window_data(df, data_channels = ['c0', 'c1', 'x_quat', 'y_quat', 'z_quat', 'w_quat'], window_length = 512, 
                window_overlap = .5, remainder_cutoff = .25, keep_remainder = True):
       
    window_shift = int(window_length * (1 - window_overlap))
    number_of_perfect_windows = int(np.floor((len(df) - window_length) / window_shift + 1))
    remaining_samples = int((len(df) - window_length) % window_shift)
    
    assert (number_of_perfect_windows > 0, 
            "dataframe is {} samples, window is {} samples".format(len(df), window_length))
    
    if remaining_samples < remainder_cutoff * window_length:
        keep_remainder = False
    
    relevant_info = np.tile(df[['label', 'rep_number', 'subject_id']].values[0], 
                            (number_of_perfect_windows + keep_remainder, 1))
    
    data_tensor = np.empty((number_of_perfect_windows + keep_remainder,
                            window_length,
                            len(data_channels)),
                           dtype = np.float64)
    
    df_values = df[data_channels].values
        
    for i in range(0, number_of_perfect_windows):
        data_tensor[i] = df_values[i * window_shift: i * window_shift + window_length]
        
    if keep_remainder:
        data_tensor[-1] = df_values[-window_length:]
    
    return data_tensor, relevant_info
    
def concurrent_shuffle(data_tensor, labels):
    assert (np.shape(labels)[0] == np.shape(data_tensor)[0], 
            "received {} labels for {} samples".format(len(labels), np.shape(data_tensor)[0]))
    random_permutation = np.random.permutation(np.shape(labels)[0])
    shuffled_labels = labels[random_permutation]
    shuffled_data_tensor = data_tensor[random_permutation]
    
    return shuffled_data_tensor, shuffled_labels


def encode_labels(incoming_labels):
    return tf.keras.utils.to_categorical(incoming_labels)

  assert (number_of_perfect_windows > 0,
  assert (np.shape(labels)[0] == np.shape(data_tensor)[0],


## Data Analysis Functions

In [0]:
def check_sampling_frequency(diff_timestamps, cutoff_diff = 1.9):
    mean_delta = np.mean(diff_timestamps)
    std_delta = np.std(diff_timestamps)
    max_delta = np.max(diff_timestamps)
    min_delta = np.min(diff_timestamps)
    frac_cutoff = np.sum(diff_timestamps > cutoff_diff)/len(diff_timestamps)
    
    return mean_delta, std_delta, max_delta, min_delta, frac_cutoff

def calculate_f1(true_labels, predicted_labels, num_of_classes = NUMBER_OF_UNIQUE_GESTURES):
    f1 = 0.0
    for x in range(0,num_of_classes):
        class_count = np.sum(true_labels == x)
        
        if class_count == 0:
            continue
        precision =  calculate_precision(true_labels, predicted_labels, x)
        recall = calculate_recall(true_labels, predicted_labels, x)
        f1 = f1 + 2 * class_count / len(true_labels) * precision * recall / (precision + recall)
    return f1

def calculate_precision(true_labels, predicted_labels, i):
    num_true_positives = np.sum((predicted_labels == i) & (true_labels == i))
    num_true_negatives = np.sum((predicted_labels != i) & (true_labels != i))
    num_false_positives = np.sum((predicted_labels == i) & (true_labels != i))
    num_false_negatives = np.sum((predicted_labels != i) & (true_labels == i)) 
    
    precision = num_true_positives / (num_false_positives + num_true_positives)
    return precision
    
def calculate_recall(true_labels, predicted_labels, i):
    num_true_positives = np.sum((predicted_labels == i) & (true_labels == i))
    num_true_negatives = np.sum((predicted_labels != i) & (true_labels != i))
    num_false_positives = np.sum((predicted_labels == i) & (true_labels != i))
    num_false_negatives = np.sum((predicted_labels != i) & (true_labels == i))
    
    recall = num_true_positives / (num_true_positives + num_false_negatives)
    
    return recall

def generate_extra_accuracy_metrics(test_set_labels, predicted_task_labels, task_vector = None):
    if task_vector is None:
        task_vector = np.sort(np.unique(test_set_labels).astype(dtype=np.int32))
    
    label_acc = np.zeros(task_vector.shape, dtype=np.float32)
    test_set_labels = np.squeeze(test_set_labels)
    
    sensitivity = np.zeros(task_vector.shape, dtype=np.float32)
    precision = np.zeros(task_vector.shape, dtype=np.float32)
    balanced_accuracy = np.zeros(task_vector.shape, dtype=np.float32)
    f1_score = np.zeros(task_vector.shape, dtype=np.float32)
    for i in task_vector: 
        num_true_positives = np.sum((predicted_task_labels == i) & (test_set_labels == i))
        num_true_negatives = np.sum((predicted_task_labels != i) & (test_set_labels != i))
        num_false_positives = np.sum((predicted_task_labels == i) & (test_set_labels != i))
        num_false_negatives = np.sum((predicted_task_labels != i) & (test_set_labels == i))
    
        if num_true_positives + num_false_negatives == 0:
            sensitivity[i] = 0.
            balanced_accuracy[i] = round(num_true_negatives / (num_false_positives + num_true_negatives), 3)
        else:
            sensitivity[i] = np.round(num_true_positives / (num_true_positives + num_false_negatives), 3)
            balanced_accuracy[i] = round((sensitivity[i] 
                                      + num_true_negatives / (num_false_positives + num_true_negatives)) / 2, 3)
    
        if num_false_positives + num_true_positives == 0:
            precision[i] = 0.
        else:
            precision[i] = round(num_true_positives / (num_false_positives + num_true_positives), 3)
        
        f1_score[i] = round((2 * precision[i] * sensitivity[i]) / (precision[i] + sensitivity[i]), 3)
        
    print(tabulate([np.concatenate((['task_num'], task_vector)), 
                    np.concatenate((['sensitivity/recall'], sensitivity)),
                    np.concatenate((['precision'], precision)),
                    np.concatenate((['f1_score'], f1_score)),
                    np.concatenate((['balanced_accuracy'], balanced_accuracy))],
                   headers = "firstrow"), '\n')  
    return None
    
def loss_plot(history, has_validation = True):
    plt.plot(history.history['loss'], label = 'train')
    if has_validation:
        plt.plot(history.history['val_loss'], label = 'val')
    plt.legend()
    plt.show()
    return None

def plain_confusion_matrix(true_labels,
                          predicted_labels,
                          normalize = True):
    sess = tf.Session()
    with sess.as_default():
        cm = np.array(sess.run(tf.confusion_matrix(true_labels.astype(np.int32), 
                                                   predicted_labels.astype(np.int32))), dtype = np.int32)

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        
    return cm    

def plot_confusion_matrix(true_labels,
                          predicted_labels,
                          target_names = UNIQUE_GESTURE_NAMES,
                          title = 'Confusion matrix',
                          cmap = None,
                          normalize = True):
    sess = tf.Session()
    with sess.as_default():
        cm = np.array(sess.run(tf.confusion_matrix(true_labels.astype(np.int32), 
                                                   predicted_labels.astype(np.int32))), dtype = np.int32)

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    
    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.2f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()
    return None

def gen_fast_plot(x, subject_name, true_time = False):
    """ Takes in a data_dict and a name ex: ['S01'] and plots it """
    if true_time:
        timestamp_vector = (x['timestamp'] - x['timestamp'][0]) / 1000
    else:
        timestamp_vector = range(0, len(x['timestamp']))
        
    title_str = ('Plot for subject', subject_name)
  
    fig = plt.figure(figsize = (25, 10), dpi = 80, facecolor = 'w', edgecolor = 'k')
    
    plt.subplot(311)
  
    x_handle = plt.plot(timestamp_vector, x['x_quat'], label ='x')
    y_handle = plt.plot(timestamp_vector, x['y_quat'], label ='y')
    z_handle = plt.plot(timestamp_vector, x['z_quat'], label ='z')
    w_handle = plt.plot(timestamp_vector, x['w_quat'], label ='w')
    plt.legend(loc = 'upper right')
    plt.title(title_str)
   
    plt.subplot(312)
    
    channel_0 = plt.plot(timestamp_vector, x['c0'], label = 'channel_0')
    channel_1 = plt.plot(timestamp_vector, x['c1'], label = 'channel_1')
    plt.legend(loc = 'upper right')
    
    plt.subplot(313)
    
    gesture_labels = plt.plot(timestamp_vector, x['label'], label = 'gesture_labels')
    plt.ylim([np.min(x['label']) - 0.5, np.max(x['label']) + 0.5])
    plt.legend(loc = 'upper right')
    plt.show()

# Main Execution

In [0]:
if __name__ == '__main__':   
    main(model_choice = MODEL_TO_TRAIN, straight_parameters = None)