<h1>Classical Methods - Processing</h1>

<h5>Importing Packages</h5>

In [None]:
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import mode
from joblib import dump, load
from tqdm.notebook import tqdm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.cluster import KMeans
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, plot_confusion_matrix, confusion_matrix

<h5>Data Parameters</h5>

In [None]:
%matplotlib inline
# %matplotlib notebook
# pd.set_option("float_format", '{:0.10f}'.format)
# pd.set_option('display.max_columns', 30)

In [None]:
try:
    import google.colab
    datasets_folder = '/drive/My Drive/Colab Notebooks/DataSets/'
    work_folder = '/drive/My Drive/Colab Notebooks/Experiments/Classical Machine Learning/'
    IN_COLAB = True
except:
    datasets_folder = '/Google Drive/Colab Notebooks/DataSets/'
    work_folder = '/Google Drive/Colab Notebooks/Experiments/Classical Machine Learning/'
    IN_COLAB = False

print("In Colab:", IN_COLAB)
print("Work Folder:", work_folder)

In Colab: True
Work Folder: /drive/My Drive/Colab Notebooks/Experimentos/Classical Machine Learning/


<h5>Data Functions</h5>

In [None]:
# Load raw datasets from disk
# Input: folder where is the datasets folder and files
# Output: dict -> { "pvs_x": { "left": df, "right": df, "labels": df } }
def getDataSets(folder=datasets_folder):
    
    datasets = {}
    
    for i in range(1, 10):
        
        dataset_folder = os.path.join(folder, "PVS " + str(i))
        
        left =   pd.read_csv(os.path.join(dataset_folder, 'dataset_gps_mpu_left.csv'),  float_precision="high")
        right =  pd.read_csv(os.path.join(dataset_folder, 'dataset_gps_mpu_right.csv'), float_precision="high")
        labels = pd.read_csv(os.path.join(dataset_folder, 'dataset_labels.csv'),        float_precision="high")
        
        datasets["pvs_" + str(i)] = {
            "left": left,
            "right": right,
            "labels": labels
        }
    
    return datasets

# Get fields filtering by inputs
# Input: data types and placements
# Output: string[]
def getFields(acc=False, gyro=False, mag=False, temp=False, speed=False, location=False, below_suspension=False, above_suspension=False, dashboard=False):
    
    all_fields = [
        'timestamp', 
        'acc_x_dashboard', 'acc_y_dashboard', 'acc_z_dashboard',
        'acc_x_above_suspension', 'acc_y_above_suspension', 'acc_z_above_suspension', 
        'acc_x_below_suspension', 'acc_y_below_suspension', 'acc_z_below_suspension', 
        'gyro_x_dashboard', 'gyro_y_dashboard', 'gyro_z_dashboard', 
        'gyro_x_above_suspension', 'gyro_y_above_suspension', 'gyro_z_above_suspension',
        'gyro_x_below_suspension', 'gyro_y_below_suspension', 'gyro_z_below_suspension', 
        'mag_x_dashboard', 'mag_y_dashboard', 'mag_z_dashboard', 
        'mag_x_above_suspension', 'mag_y_above_suspension', 'mag_z_above_suspension', 
        'temp_dashboard', 'temp_above_suspension', 'temp_below_suspension', 
        'timestamp_gps', 'latitude', 'longitude', 'speed'
    ]
    
    return_fields = []
    
    for field in all_fields:
            
        data_type = False
        placement = False
        
        if(speed and field == "speed"):
            placement = data_type = True
            
        if(location and (field == "latitude" or field == "longitude")):
            placement = data_type = True
        
        if(acc):
            data_type = data_type or field.startswith("acc_")
        
        if(gyro):
            data_type = data_type or field.startswith("gyro_")
            
        if(mag):
            data_type = data_type or field.startswith("mag_")
            
        if(temp):
            data_type = data_type or field.startswith("temp_")
            
        if(below_suspension):
            placement = placement or field.endswith("_below_suspension")
            
        if(above_suspension):
            placement = placement or field.endswith("_above_suspension")
            
        if(dashboard):
            placement = placement or field.endswith("_dashboard")
        
        if(data_type and placement):
            return_fields.append(field)
            
    return return_fields

# Get subsets from raw datasets. 
# For each raw dataset, returns a subset with only fields passed.
# Input: raw datasets (dict), fields (string[]) and labels (string[])
# Output: dict -> { "pvs_x": { "left": df, "right": df, "labels": df } }
def getSubSets(datasets, fields, labels):
    
    subsets = {}
    
    for key in datasets.keys():
        
        subsets[key] = {
            "left": datasets[key]["left"][fields],
            "right": datasets[key]["right"][fields],
            "labels": datasets[key]["labels"][labels]
        }
    
    return subsets

# Generate a dict with agg functions for all fields based on inputs
# Input: fields and arrays with agg functions to each data type.
# Output: dict -> { field: aggFn }
def getAggFunctions(fields, acc=None, gyro=None, mag=None, speed=None):

    agg_fn = {}

    for field in fields:
        
        if(field.startswith("acc_")):
            agg_fn[field] = acc
        
        elif(field.startswith("gyro_")):
            agg_fn[field] = gyro
        
        elif(field.startswith("mag_")):
            agg_fn[field] = mag
        
        elif(field == "speed"):
            agg_fn[field] = speed
        
        else:
            agg_fn[field] = None
            
    return agg_fn

# Extract features from subsets.
# Input: subsets, window function, window size, aggregation functions and sides.
# Output: dict -> { "pvs_x": { "left": { "inputs": df, "outputs": df }, "right": { "inputs": df, "outputs": df } } }
def getExtractedFeatures(subsets, window_fn, window, agg_fn, sides=['left', 'right']):
    
    feature_sets = {}
    
    for key in subsets.keys():

        feature_sets[key] = {}

        for side in sides:
    
            inputs, outputs = window_fn(subsets[key][side], subsets[key]["labels"], window, agg_fn)
            
            feature_sets[key][side] = {
                "inputs": inputs,
                "outputs": outputs
            }

    return feature_sets

# Get train and test sets from feature sets.
# Inputs: feature sets (dict), setsTrain (string[]) and setsTest(string[]) with datasets names ("pvs_x"), and sides.
# Outputs: df -> input train, input test, output train, output test
def getTrainTestSets(feature_sets, sets_train, sets_test, sides=['left', 'right']):
    
    input_train = pd.DataFrame()
    input_test = pd.DataFrame()
    output_train = pd.DataFrame()
    output_test = pd.DataFrame()

    for key in feature_sets.keys():

        for side in sides:
    
            if (key in sets_train):
                input_train  = input_train.append(feature_sets[key][side]["inputs"], ignore_index=True)
                output_train = output_train.append(feature_sets[key][side]["outputs"], ignore_index=True)
                
            elif (key in sets_test):
                input_test  = input_test.append(feature_sets[key][side]["inputs"],    ignore_index=True)
                output_test = output_test.append(feature_sets[key][side]["outputs"], ignore_index=True)

    return input_train, input_test, output_train, output_test

<h5> Feature Extraction </h5>

In [None]:
# Moving Window
def extractFeatureMovingWindow(data, labels, window, agg_fn):
    
    if(window == 1):
        return data, labels
    
    inputs = data.rolling(window).agg(agg_fn)
    outputs = labels.rolling(window).mean().round(0)
    
    inputs.columns = ['_'.join(col_in).strip() for col_in in inputs.columns.values]
    
    inputs = inputs[window-1:]
    outputs = outputs[window-1:]
    
    inputs = inputs.reset_index(drop=True)
    outputs = outputs.reset_index(drop=True)
    
    return inputs, outputs

In [None]:
# Fixed Window
def extractFeatureFixedWindow(data, labels, window, agg_fn):
    
    if(window == 1):
        return data, labels
    
    inputs, outputs = extractFeatureMovingWindow(data, labels, window, agg_fn)
    select_index = np.arange(0, len(inputs), window)

    inputs = inputs.iloc[select_index, :]
    outputs = outputs.iloc[select_index, :]
    
    inputs = inputs.reset_index(drop=True)
    outputs = outputs.reset_index(drop=True)
    
    return inputs, outputs

<h5>Dump and Load Models</h5>

In [None]:
# Model - tuple of (model, params, train_accuracy, validation_accuracy)
# Compare and return best model
def compareBestModel(best_model, new_model):

    if (best_model is None) or (best_model[3] < new_model[3]):
        return new_model
    else:
        return best_model

# Dump best model
def saveBestModel(path, file_prefix, best_model):
    
    if not os.path.exists(path):
        os.makedirs(path)
    
    file = file_prefix + "-train-acc-" + str(round(best_model[2], 10)) + "-val-acc-" + str(round(best_model[3], 10)) + ".joblib"
    dump(best_model, os.path.join(path, file)) 

# Load best model
def loadBestModel(path, file):
    return load(os.path.join(path, file))

In [None]:
# Save a log for each experiment execution (params for each execution)
def saveExecutionLog(path, file_prefix, data, columns):
    save = pd.DataFrame(columns=columns, data=data)
    save.to_csv(os.path.join(path, file_prefix + "-execution-log.csv"), index=False)

<h5>Parameter Variations</h5>

In [None]:
experiment_by_dataset = [
    { "train": ["pvs_1", "pvs_3", "pvs_4", "pvs_6", "pvs_7", "pvs_9"], "test":  ["pvs_2", "pvs_5", "pvs_8"]},
    { "train": ["pvs_1", "pvs_2", "pvs_3", "pvs_7", "pvs_8", "pvs_9"], "test":  ["pvs_4", "pvs_5", "pvs_6"]},
    { "train": ["pvs_1", "pvs_2", "pvs_4", "pvs_6", "pvs_8", "pvs_9"], "test":  ["pvs_3", "pvs_5", "pvs_7"]}
]

experiment_by_fields = [
    getFields(acc=True,  gyro=False, speed=True, below_suspension=True), # acc_below_suspension
    getFields(acc=False, gyro=True,  speed=True, below_suspension=True), # gyro_below_suspension
    getFields(acc=True,  gyro=True,  speed=True, below_suspension=True), # acc_gyro_below_suspension
    getFields(acc=True,  gyro=False, speed=True, above_suspension=True), # acc_above_suspension
    getFields(acc=False, gyro=True,  speed=True, above_suspension=True), # gyro_above_suspension
    getFields(acc=True,  gyro=True,  speed=True, above_suspension=True), # acc_gyro_above_suspension
    getFields(acc=True,  gyro=False, speed=True, dashboard=True), # acc_dashboard
    getFields(acc=False, gyro=True,  speed=True, dashboard=True), # gyro_dashboard
    getFields(acc=True,  gyro=True,  speed=True, dashboard=True) # acc_gyro_dashboard
]

experiment_by_window_fn = [
    extractFeatureFixedWindow, # fixed
    extractFeatureMovingWindow # moving
]

experiment_by_agg_fn = [
    { "acc": ["mean"],               "gyro": ["mean"],               "speed": ["mean"] },
    { "acc": ["std"],                "gyro": ["std"],                "speed": ["mean"] },
    { "acc": ["var"],                "gyro": ["var"],                "speed": ["mean"] },
    { "acc": ["mean", "std"],        "gyro": ["mean", "std"],        "speed": ["mean"] },
    { "acc": ["mean", "var"],        "gyro": ["mean", "var"],        "speed": ["mean"] },
    { "acc": ["mean", "std", "var"], "gyro": ["mean", "std", "var"], "speed": ["mean"] }
]

<h5>Labels Fields</h5>

In [None]:
surface_type_labels = ["land", "cobblestone", "asphalt"]

In [None]:
surface_type_labels_plot = ["Dirt \n Road", "Cobblestone \n Road", "Asphalt \n Road"]  

<h5> Util Functions </h5>

In [None]:
def createPathIfNotExists(path):

    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
def plot(data, labels, gyro_field=None, acc_field=None, speed_field=None):
    
    plt.figure(figsize=(16,6)) 
    
    if(speed_field):
        (data[speed_field] * 3.6).plot()
    
    if(gyro_field):
        data[gyro_field].plot(color="g")
    
    if(acc_field):
        data[acc_field].plot(color="y")

    i = 1
    
    for col in labels.columns:
        (labels[col] * i * 20).plot(linewidth=2)
        i += 1

    plt.legend()

In [None]:
def plotConfusionMatrix(values, title, labels=surface_type_labels_plot, figsize=(4, 4)):
    con_mat_df = pd.DataFrame(values, index=labels, columns=labels)
    figure = plt.figure(figsize=figsize)
    sns.set(font_scale=1.2)
    sns.heatmap(con_mat_df, annot=True, cmap=plt.cm.Blues, annot_kws={"size": 14})
    plt.tight_layout()
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
    figure.savefig('confusion_matrix.png', bbox_inches="tight")
    # plt.savefig('results.png')