## Install and import

In [1]:
%pip install imblearn

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import joblib

## Define the useful functions

In [4]:
def combine_csv(folder_path, tmp_path):
    """
    Combine all CSV files in a folder into a single DataFrame.
    :param folder_path: Path to the folder containing the CSV files
    :param seq_idx: Sequence index
    :param label: Label of the sequence (Normal - 0, Abnormal - 1)
    :return: A single DataFrame containing all the data from the CSV files
    """

    # Get a list of all CSV files in the folder
    csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]

    # Create an empty DataFrame to store the combined data
    combined_df = pd.DataFrame()

    # Iterate over the CSV files in the folder
    for file in csv_files:
        # Construct the full path to each CSV file
        file_path = os.path.join(folder_path, file)

        # Read each CSV file into a DataFrame
        df = pd.read_csv(file_path)
        # Drop the time. Will add later.
        df = df.drop(labels=df.columns[0], axis=1)

        # Extract the file name (excluding the extension) to use as a prefix
        file_name = os.path.splitext(file)[0]

        # Add a prefix to each column based on the file name
        df = df.add_prefix(f'{file_name}_')

        # Concatenate the current DataFrame with the combined DataFrame
        combined_df = pd.concat([combined_df, df], axis=1)

    df = pd.read_csv(file_path)
    combined_df = pd.concat([df['time'], combined_df], axis=1)
    combined_df.loc[:, 'test_condition'] = tmp_path

    return combined_df

def load_data(path_header, folder_path):
    df = pd.DataFrame()
    for tmp_path in folder_path:
        # path = path_header + tmp_path
        path = path_header + '/' + tmp_path
        tmp_df = combine_csv(path, tmp_path)
        df = pd.concat([df, tmp_df])
        df = df.reset_index(drop=True)
    return df

In [26]:
def separate_features_and_target_variables(df, training_features, target_variable):
    """
    Separate the features and target variables.
    :param df: The DataFrame containing the features and target variables
    :param training_features: The names of the training features
    :param target_variable: The name of the target variable
    :return: A tuple containing the features DataFrame and the target variable Series
    """
    X = df[training_features]
    y = df[target_variable]

    return X, y

def standard_scaler(X_train, X_test):
    """
    Standardize the data using StandardScaler.
    :param X_train: Training data
    :param X_test: Test data
    :return: Standardized training and test data
    """
    # Create a StandardScaler object
    scaler = StandardScaler()

    # Standardize the training and test data
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return X_train, X_test

def delete_outliers(X, y, threshold=3):
    """
    Delete outliers from the DataFrame.
    :param df: The DataFrame containing the features and target variables
    :param threshold: The threshold used to identify outliers
    :return: The DataFrame with the outliers removed
    """
    # Merge X and y
    df = pd.concat([X, y], axis=1)

    # Calculate the z-scores for each column
    z_scores = np.abs((df - df.mean()) / df.std())

    # Find the rows whose z-scores are greater than the threshold
    outlier_indices = np.where(z_scores > threshold)[0]

    # Delete the rows whose z-scores are greater than the threshold
    df = df.drop(df.index[outlier_indices])

    # Separate the features and target variables
    X = df.drop(y.name, axis=1)
    y = df[y.name]
    
    return X, y

def add_1st_derivative_features(X):
    """
    Add derivative features to the training and test data.
    :param X: dataframe in pandas format
    :return: Training and test data with the derivative features added
    """
    # Create a new DataFrame to store the derivative features
    X_new = pd.DataFrame()

    # Iterate over the columns in the DataFrame
    for col in X.columns:
        # Calculate the first derivative of the column
        first_derivative = np.gradient(X[col])

        # Create a new column name for the first derivative
        first_derivative_name = col + '_1st_der'

        # Add the first derivative to the new DataFrame
        X_new[first_derivative_name] = first_derivative

    # Concatenate the new DataFrame with the original DataFrame
    X_new = pd.concat([X, X_new], axis=1)

    return X_new

def oversample_minority_class(X_train, y_train):
    """
    Oversample the minority class using SMOTE.
    :param X_train: Training data
    :param y_train: Training labels
    :return: Oversampled training data and training labels
    """
    # Create an SMOTE object
    sm = SMOTE(random_state=42)

    # Fit the SMOTE object to the training data and labels
    X_train, y_train = sm.fit_resample(X_train, y_train)

    return X_train, y_train

def smoothen_data(X, window_size):
    """
    Smoothen the data using a moving average.
    :param X: Training or test data
    :param window_size: Window size for the moving average
    :return: Smoothened training or test data
    """
    X_smoothen = X.rolling(window=window_size, min_periods=1).mean()
    return X_smoothen

## Preprocessing of the data

### Define the training and target features

In [11]:
training_features = dict()
training_features["temperature"] = list()
training_features["voltage"] = list()
training_features["position"] = list()
training_features["selected_motor"] = dict()
target_feature = dict()

for i in range(1, 7):
    training_features["temperature"].append("data_motor_{}_temperature".format(i))
    training_features["voltage"].append("data_motor_{}_voltage".format(i))
    training_features["position"].append("data_motor_{}_position".format(i))
    training_features["selected_motor"][i] = list()
    training_features["selected_motor"][i].append("data_motor_{}_temperature".format(i))
    training_features["selected_motor"][i].append("data_motor_{}_voltage".format(i))
    training_features["selected_motor"][i].append("data_motor_{}_position".format(i))
    target_feature[i] = "data_motor_{}_label".format(i)

training_features["all_numerical_features"] = training_features["temperature"] + training_features["voltage"] + training_features["position"]

In [12]:
print(training_features)

{'temperature': ['data_motor_1_temperature', 'data_motor_2_temperature', 'data_motor_3_temperature', 'data_motor_4_temperature', 'data_motor_5_temperature', 'data_motor_6_temperature'], 'voltage': ['data_motor_1_voltage', 'data_motor_2_voltage', 'data_motor_3_voltage', 'data_motor_4_voltage', 'data_motor_5_voltage', 'data_motor_6_voltage'], 'position': ['data_motor_1_position', 'data_motor_2_position', 'data_motor_3_position', 'data_motor_4_position', 'data_motor_5_position', 'data_motor_6_position'], 'selected_motor': {1: ['data_motor_1_temperature', 'data_motor_1_voltage', 'data_motor_1_position'], 2: ['data_motor_2_temperature', 'data_motor_2_voltage', 'data_motor_2_position'], 3: ['data_motor_3_temperature', 'data_motor_3_voltage', 'data_motor_3_position'], 4: ['data_motor_4_temperature', 'data_motor_4_voltage', 'data_motor_4_position'], 5: ['data_motor_5_temperature', 'data_motor_5_voltage', 'data_motor_5_position'], 6: ['data_motor_6_temperature', 'data_motor_6_voltage', 'data_mo

### Select, read and preprocess the data

In [41]:
# Select the data to use for training and testing
path_training = ['static_with_fault_1', 'static_with_fault_2', 'static_with_fault_3', 
'static_with_fault_4', 'static_with_fault_5', 'static_with_fault_6', 
'steady_state_after_movement', 'steady_state_not_moving'
]
path_test = ['task_fault']
path_header = os.path.abspath('../data_collection/collected_data/')

# Load the data
df = load_data(path_header, path_training)
df_test = load_data(path_header, path_test)

# Choose the training method and features
# chosen_training_method = "all_numerical_features"
# chosen_training_method = "selected_motor" # Not the same organisation of the data as the others !
chosen_training_method = "temperature"
# chosen_training_method = "voltage"
# chosen_training_method = "position"

X, y = dict(), dict()
X_test, y_test = dict(), dict()
X_train, X_val, y_train, y_val = dict(), dict(), dict(), dict()

for n_motor in range(1, 7):
    chosen_target_feature = target_feature[n_motor]
    if chosen_training_method == "selected_motor":
        chosen_training_features = training_features[chosen_training_method][n_motor]
    else:
        chosen_training_features = training_features[chosen_training_method]
    
    X[i], y[i] = separate_features_and_target_variables(df, chosen_training_features, chosen_target_feature)
    X_test[i], y_test[i] = separate_features_and_target_variables(df_test, chosen_training_features, chosen_target_feature)

    # Smoothen the data
    X[i] = smoothen_data(X[i], window_size=5)
    X_test[i] = smoothen_data(X_test[i], window_size=5)

    # Add derivative features
    X[i] = add_1st_derivative_features(X[i])
    X_test[i] = add_1st_derivative_features(X_test[i])

    # Oversample the minority class
    X[i], y[i] = oversample_minority_class(X[i], y[i])

    # Split the data into training and validating sets
    X_train[i], X_val[i], y_train[i], y_val[i] = train_test_split(X[i], y[i], test_size=0.2, random_state=42)


In [49]:

# Create a list of classifiers to evaluate
classifiers = [
    ('Logistic Regression', LogisticRegression(class_weight='balanced')),
    # ('SVM', SVC(class_weight='balanced')),
    # ('Decision Tree', DecisionTreeClassifier(class_weight='balanced')),
    # ('Random Forest', RandomForestClassifier(class_weight='balanced')),
    # Add more classifiers here
]

# Define hyperparameters for grid search for each classifier
param_grids = [
    {'C': np.logspace(-1, 1, 5), 'max_iter': [100, 200, 500, 1000, 2000]},  # Hyperparameters for Logistic Regression
    # {'C': np.logspace(-1, 1, 5), 'gamma': np.logspace(-1, 1, 5), 'kernel': ['poly'], 'degree': [2,3]}, # Hyperparameters for SVM
    # {'criterion': ['gini', 'entropy'], 'max_depth': [2, 3, 4]},  # Hyperparameters for Decision Tree
    # {'n_estimators': [10, 50, 100, 200], 'criterion': ['gini', 'entropy'], 'max_depth': [2, 3, 4]} # Hyperparameters for Random Forest
    # Add more hyperparameters for other classifiers here
]

n_motor = 1

# Create an empty list to store the results
results = []

# Iterate over the classifiers and perform grid search
for classifier, param_grid in zip(classifiers, param_grids):
    
    # Rename the param_grid keys with the classifier name
    param_grid = {f'{classifier[0]}__{key}': value for key, value in param_grid.items()}

    # Create a pipeline with Standardization and the current classifier
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        classifier
    ])

    # Use GridSearchCV to find the best hyperparameters and fit the pipeline
    grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='f1', verbose = 3)
    grid_search.fit(X_train[n_motor], y_train[n_motor])

    # Use grid_search.predict to make predictions on the testing dataset
    y_pred = grid_search.predict(X_test[n_motor])

    # Compute evaluation metrics
    conf_matrix = confusion_matrix(y_test[n_motor], y_pred)
    accuracy = accuracy_score(y_test[n_motor], y_pred)
    precision = precision_score(y_test[n_motor], y_pred)
    recall = recall_score(y_test[n_motor], y_pred)
    f1 = f1_score(y_test[n_motor], y_pred)

    # Store the results in a dictionary
    result = {
        'Classifier': classifier[0],
        'Best Parameters': grid_search.best_params_,
        'Confusion Matrix': conf_matrix,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1
    }

    # Append the result to the list of results
    results.append(result)

    # Save the best model from grid search
    best_model = grid_search.best_estimator_
    joblib.dump(best_model, f'best_model_{classifier[0]}_motor_{n_motor}_{chosen_training_method}.model')

Fitting 5 folds for each of 10 candidates, totalling 50 fits
[CV 1/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=1000;, score=0.934 total time=   0.1s
[CV 2/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=1000;, score=0.929 total time=   0.0s
[CV 3/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=1000;, score=0.927 total time=   0.0s
[CV 4/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=1000;, score=0.936 total time=   0.3s
[CV 5/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=1000;, score=0.935 total time=   0.1s
[CV 1/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=2000;, score=0.934 total time=   0.0s
[CV 2/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=2000;, score=0.929 total time=   0.0s
[CV 3/5] END Logistic Regression__C=0.1, Logistic Regression__max_iter=2000;, score=0.927 total time=   0.0s
[CV 4/5] END Logistic Regression__C=0.1, Logistic Regression__max_i

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))


FileNotFoundError: [Errno 2] No such file or directory: 'best_model_Logistic Regression_motor_1_temperature.model'