In [19]:
import numpy as np
import pandas as pd
import os
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
from sklearn.model_selection import LeaveOneOut, cross_val_score
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from tabulate import tabulate
from sklearn.metrics import confusion_matrix
from sklearn.feature_selection import SelectKBest, f_classif
from scipy.fft import fft
import time
from pyentrp import entropy as ent
from sklearn.model_selection import GroupKFold

# Function to load and preprocess data
def load_data(subject, repetition, experiment_type, minute):
    file_path = os.path.join(data_path, f'{subject}_{repetition}_{experiment_type}{minute}.csv')

    # Read data, treating potential mixed types as strings, and coercing errors to NaN
    data = pd.read_csv(file_path, dtype=str, low_memory=False, on_bad_lines='skip')    

    data = data.apply(pd.to_numeric, errors='coerce') 

    # Label assignment based on experiment type
    if experiment_type == 'm':
        data['label'] = 0
    elif experiment_type == 'l':
        data['label'] = 1
    elif experiment_type == 'c':
        data['label'] = 2
    elif experiment_type == 'e':
        data['label'] = 3
    else:
        raise ValueError("Invalid experiment_type.")

    return data

# Determinar el número óptimo de características usando SelectKBest con validación cruzada
def optimal_k(features, labels):
    scores = []
    for k in range(1, min(20, features.shape[1]+1)):  # Asumimos un máximo de 20 características o menos si hay menos disponibles
        selector = SelectKBest(f_classif, k=k)
        selected_features = selector.fit_transform(features, labels)
        score = np.mean(cross_val_score(LinearSVC(dual=False), selected_features, labels, cv=5))
        scores.append((k, score))
    best_k = sorted(scores, key=lambda x: x[1], reverse=True)[0]
    print(f"Optimal number of features: {best_k[0]} with cross-validation score: {best_k[1]:.2f}")
    return best_k[0]

# Function to reduce features using SelectKBest
def select_features(X_train, y_train, X_test, k=10):
    # Initialize and fit SelectKBest
    selector = SelectKBest(score_func=f_classif, k=k)
    selector.fit(X_train, y_train)
    
    # Transform both training and testing data
    X_train_reduced = selector.transform(X_train)
    X_test_reduced = selector.transform(X_test)
    
    return X_train_reduced, X_test_reduced

# Function to help calculate specifity and sensibility
def calc_metrics(cm):
    # Sum all confusion matrix entries to get total number of instances
    total = cm.sum()

    # Sum along the main diagonal to get all true positives
    TP = np.diag(cm)

    # Calculate False Positives, False Negatives, and True Negatives
    FP = cm.sum(axis=0) - TP
    FN = cm.sum(axis=1) - TP
    TN = total - (FP + FN + TP)

    # Calculate sensitivity (recall) and specificity for each class
    sensitivity = TP / (TP + FN)
    specificity = TN / (TN + FP)

    # Calculate average sensitivity and specificity if needed
    avg_sensitivity = np.mean(sensitivity)
    avg_specificity = np.mean(specificity)

    return avg_sensitivity, avg_specificity

In [20]:
import numpy as np
import pandas as pd

def calculate_total_band_energy(signal):
    """Calculates the total energy of the frequency band of a signal using FFT."""
    fft_values = np.fft.fft(signal)
    magnitude_squared = np.abs(fft_values) ** 2
    total_energy = np.sum(magnitude_squared) / len(signal)
    return total_energy

def max_power(signal):
    """Calculates the maximum power of a signal using FFT."""
    fft_values = np.fft.fft(signal)
    power_spectrum = np.abs(fft_values) ** 2 / len(signal)
    return np.max(power_spectrum)

def shannon_entropy(signal):
    """Calculates Shannon entropy of a signal."""
    value, counts = np.unique(signal, return_counts=True)
    probabilities = counts / counts.sum()
    entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))  # To avoid log(0)
    return entropy

def calculate_peak_to_peak(signal):
    """Calculates the peak-to-peak value of a signal."""
    peak_to_peak_value = signal.max() - signal.min()
    return peak_to_peak_value

def differential_entropy(signal):
    variance = np.var(signal)
    if variance == 0:
        return 0  # Avoid log(0), return 0 for a signal with no variability
    return 0.5 * np.log(2 * np.pi * np.e * variance)


def sample_entropy(signal, m=2, r=None):
    try:
        std_signal = np.std(signal)
        if std_signal == 0:
            return 0  # Return a default or placeholder value that maintains consistency
        if r is None:
            r = 0.2 * std_signal
        entropy_value = ent.sample_entropy(signal, m, r)
        if len(entropy_value) > 0:
            return entropy_value[0]  # Ensure this returns a single value
        else:
            return 0  # Return a default value if entropy array is empty
    except Exception as e:
        print(f"Error calculating sample entropy: {str(e)}")
        return 0  # Return a default value in case of other exceptions



def extract_features(data, num_windows):
    window_size = int((60*256) / num_windows)  # Adjusted to dataset specifics
    features, labels = [], []
    for i in range(num_windows):
        window_data = data.iloc[i*window_size : (i+1)*window_size]
        if len(window_data) < window_size:
            raise ValueError("NOT ENOUGH DATA FOR 1 MINUTE.")
        
        # Drop label from the window data if present
        if 'label' in window_data.columns:
            labels.append(window_data['label'].iloc[0])
            feature_data = window_data.drop(columns='label')
        else:
            feature_data = window_data

        # Calculate features for each channel
        means = feature_data.mean()
        max_powers = feature_data.apply(max_power)
        entropies = feature_data.apply(shannon_entropy)
        band_energies = feature_data.apply(calculate_total_band_energy)
        peak_to_peak_values = feature_data.apply(calculate_peak_to_peak)
        diff_entropies = feature_data.apply(differential_entropy)
        #sample_entropies = feature_data.apply(sample_entropy)
        #sample_entropies_array = sample_entropies.values.flatten() if sample_entropies.ndim > 1 else sample_entropies.values

        # Combine all features into a single array
        combined_features = np.concatenate([
            means.values,
            max_powers.values,
            entropies.values,
            band_energies.values,
            #sample_entropies_array,
            diff_entropies.values,
            peak_to_peak_values.values            
        ])

        # Append combined features and labels
        features.append(combined_features)

    features, labels = np.array(features), np.array(labels)
    return features, labels

def generate_feature_names(data):
    feature_names = []
    # Assuming 'data' is a DataFrame with the same structure as your actual feature data
    sample_data = data.iloc[:1]  # Take just one row to minimize processing
    if 'label' in sample_data.columns:
        sample_data = sample_data.drop(columns='label')

    for column in sample_data.columns:
        feature_names.extend([
            f"mean_{column}",
            f"max_power_{column}",
            f"entropy_{column}",
            f"band_energy_{column}",
            #f"sample_entropy_{column}", 
            f"differential_entropy_{column}",
            f"peak_to_peak_{column}"
        ])
    return feature_names


In [21]:
def save_features_labels(features, labels, subject_ids, num_windows, feature_names, folder_path):
    """
    Saves features, labels, and subject IDs to a CSV file, ensuring not to overwrite existing files.
    :param features: numpy array of features.
    :param labels: numpy array of labels.
    :param subject_ids: numpy array of subject identifiers.
    :param num_windows: number of windows, used for naming the file.
    :param folder_path: directory path where the files will be saved.
    """
    # Create the folder if it does not exist
    os.makedirs(folder_path, exist_ok=True)
    
    # Prepare the data for saving
    data = np.column_stack((subject_ids, features, labels))
    df = pd.DataFrame(data)
    df.columns = ["subject_id"] + feature_names + ["label"]
    
    # Generate the base file name
    base_file_name = os.path.join(folder_path, f"features_{num_windows}")
    extension = ".csv"
    file_name = base_file_name + extension
    counter = 1

    # Increment the file name if it already exists
    while os.path.exists(file_name):
        file_name = f"{base_file_name}({counter}){extension}"
        counter += 1

    # Save the DataFrame to a CSV file
    df.to_csv(file_name, index=False)
    print(f"File saved: {file_name}")

In [15]:
# RAPIDO Y MODULADO TODAVIA SE PODRÍA HACER LA EXTRACCIÓN DE CARACTERÍSTICAS A PARTE
# Path to data and other constants
data_path = '/home/ximo/Escritorio/ProyectoTFG/MusePreprocessed'
subjects = range(1,31)
repetitions = ['1', '2']
minutes = ['1', '2', '3']
experiment_types = ['m', 'l', 'c', 'e']
num_windows_options = [1,2]
folder_name = "/home/ximo/Escritorio/ProyectoTFG/features"

# Setting up leave-one-out cross-validation
loo = LeaveOneOut()

for num_windows in num_windows_options:

    # Preload all data and extract features once
    all_data = {}
    all_features = []
    all_labels = []
    all_subject_ids = []  

    # Load a small sample data to generate feature names
    sample_data = load_data(subjects[0], repetitions[0], experiment_types[0], minutes[0])
    feature_names = generate_feature_names(sample_data)


    for subject in subjects:
        subject_data = []
        for repetition in repetitions:
            for exp_type in experiment_types:
                for minute in minutes:
                    data = load_data(subject, repetition, exp_type, minute)
                    features, labels = extract_features(data, num_windows)
                    subject_data.append((features, labels))
                    all_features.extend(features)
                    all_labels.extend(labels)
                    all_subject_ids.extend([subject] * len(features))
        all_data[subject] = subject_data

    all_features = np.array(all_features)
    all_labels = np.array(all_labels)
    all_subject_ids = np.array(all_subject_ids)  # Convert list of subject IDs to an array

    #Guardamos las features en un ficheros
    save_features_labels(all_features, all_labels, all_subject_ids, num_windows, feature_names, folder_name)

File saved: /home/ximo/Escritorio/ProyectoTFG/features/features_1.csv
File saved: /home/ximo/Escritorio/ProyectoTFG/features/features_2.csv


In [22]:
def process_and_evaluate(file_path):
    df = pd.read_csv(file_path)
    features = df.drop(columns=['subject_id', 'label'])
    labels = df['label']
    subjects = df['subject_id']

    # Select optimal k features
    k = optimal_k(features, labels)  # Implement this function based on your criteria
    selector = SelectKBest(f_classif, k=k)
    selected_features = selector.fit_transform(features, labels)

    # Setup cross-validation
    gkf = GroupKFold(n_splits=len(np.unique(subjects)))  # Number of unique subjects

    # Model initialization
    models = {
        "SVM": LinearSVC(dual=True, max_iter=100),
        "Random Forest": RandomForestClassifier(n_estimators=100),
        "XGBoost": xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss'),
        "KNN": KNeighborsClassifier(n_neighbors=4)
    }

    results_text = ""

    # Cross-validation by subject
    for train_idx, test_idx in gkf.split(selected_features, labels, groups=subjects):
        X_train, X_test = selected_features[train_idx], selected_features[test_idx]
        y_train, y_test = labels.iloc[train_idx], labels.iloc[test_idx]

        # Store results for each model
        model_results = []
        for name, model in models.items():
            model.fit(X_train, y_train)
            predictions = model.predict(X_test)
            accuracy = accuracy_score(y_test, predictions)
            cm = confusion_matrix(y_test, predictions)
            sensitivity, specificity = calc_metrics(cm)

            model_results.append([name, accuracy, specificity, sensitivity])

        results_text += f"Results for file: {os.path.basename(file_path)}\n"
        results_text += tabulate(model_results, headers=["Model", "Accuracy", "Specificity", "Sensitivity"]) + "\n\n"

    return results_text

def main(folder_path, output_path):
    results_text = ""
    for file_name in os.listdir(folder_path):
        if file_name.startswith("features_") and file_name.endswith(".csv"):
            file_path = os.path.join(folder_path, file_name)
            results_text += process_and_evaluate(file_path)

    # Write results to the output file
    with open(output_path, 'w') as f:
        f.write(results_text)

# Paths setup
folder_path = '/home/ximo/Escritorio/ProyectoTFG/features'
output_path = '/home/ximo/Escritorio/ProyectoTFG/results.txt'
main(folder_path, output_path)

Optimal number of features: 8 with cross-validation score: 0.41




Optimal number of features: 17 with cross-validation score: 0.40




In [None]:
    # Select optimal k features
    k = optimal_k(all_features, all_labels)
    selector = SelectKBest(f_classif, k=k)
    all_features = selector.fit_transform(all_features, all_labels)

    # Aplicar la selección de características a los datos de cada sujeto
    for subject, data in all_data.items():
        new_data = []
        for features, labels in data:
            selected_features = selector.transform(features)
            new_data.append((selected_features, labels))
        all_data[subject] = new_data
            
    svm_accuracies, rf_accuracies, xgb_accuracies, knn_accuracies = [], [], [], []
    svm_sensitivities, rf_sensitivities, xgb_sensitivities, knn_sensitivities = [], [], [], []
    svm_specificities, rf_specificities, xgb_specificities, knn_specificities = [], [], [], []

    loop_start_time = time.time()

    for train_index, test_index in loo.split(subjects):
        
        train_subjects = [subjects[i] for i in train_index]
        test_subject = subjects[test_index[0]]

        # Combine training data
        X_train, y_train = [], []
        for subject in train_subjects:
            for feature_set, label_set in all_data[subject]:
                X_train.extend(feature_set)
                y_train.extend(label_set)

        X_train = np.array(X_train)
        y_train = np.array(y_train)

        # Use test data
        X_test, y_test = [], []
        for feature_set, label_set in all_data[test_subject]:
            X_test.extend(feature_set)
            y_test.extend(label_set)

        X_test = np.array(X_test)
        y_test = np.array(y_test)
        
        svm_model = LinearSVC(dual=True, max_iter=100, verbose=False)
        svm_model.fit(X_train, y_train)
        svm_predictions = svm_model.predict(X_test)
        svm_accuracy = accuracy_score(y_test, svm_predictions)
        svm_accuracies.append(svm_accuracy)

        print("SVM\n")
        
        # Train and Test Random Forest model
        rf_model = RandomForestClassifier(n_estimators=100)
        rf_model.fit(X_train, y_train)
        rf_predictions = rf_model.predict(X_test)
        rf_accuracy = accuracy_score(y_test, rf_predictions)
        rf_accuracies.append(rf_accuracy)

        # Train and Test XGBoost model
        xgb_model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
        xgb_model.fit(X_train, y_train)
        xgb_predictions = xgb_model.predict(X_test)
        xgb_accuracy = accuracy_score(y_test, xgb_predictions)
        xgb_accuracies.append(xgb_accuracy)

        # Train and Test KNN model
        knn_model = KNeighborsClassifier(n_neighbors=4)
        knn_model.fit(X_train, y_train)
        knn_predictions = knn_model.predict(X_test)
        knn_accuracy = accuracy_score(y_test, knn_predictions)
        knn_accuracies.append(knn_accuracy)

        # We calculate the confusion Matrix
        cm_svm = confusion_matrix(y_test, svm_predictions)
        cm_rf = confusion_matrix(y_test, rf_predictions)
        cm_xgb = confusion_matrix(y_test, xgb_predictions)
        cm_knn = confusion_matrix(y_test, knn_predictions)

        # After calculating the confusion matrix for each model we save specifity and sensibility
        sens_svm, spec_svm = calc_metrics(cm_svm)
        svm_sensitivities.append(sens_svm)
        svm_specificities.append(spec_svm)

        sens_rf, spec_rf = calc_metrics(cm_rf)
        rf_sensitivities.append(sens_rf)
        rf_specificities.append(spec_rf)

        sens_xgb, spec_xgb = calc_metrics(cm_xgb)
        xgb_sensitivities.append(sens_xgb)
        xgb_specificities.append(spec_xgb)

        sens_knn, spec_knn = calc_metrics(cm_knn)
        knn_sensitivities.append(sens_knn)
        knn_specificities.append(spec_knn)

        """
        # Print results after each iteration
        print(f"Results for test subject {test_subject}:")
        print(f"SVM - Accuracy: {svm_accuracy:.2f}%, Sensitivity: {sens_svm:.2f}%, Specificity: {spec_svm:.2f}%")
        print(f"Random Forest - Accuracy: {rf_accuracy:.2f}%, Sensitivity: {sens_rf:.2f}%, Specificity: {spec_rf:.2f}%")
        print(f"XGBoost - Accuracy: {xgb_accuracy:.2f}%, Sensitivity: {sens_xgb:.2f}%, Specificity: {spec_xgb:.2f}%")
        print(f"KNN - Accuracy: {knn_accuracy:.2f}%, Sensitivity: {sens_knn:.2f}%, Specificity: {spec_knn:.2f}%")
        print("\n")
        """
        
        #loop_end_time = time.time()
        #elapsed_time = loop_end_time - loop_start_time
        #print(f"Elapsed time {elapsed_time:.2f} seconds")


    seconds = 60/int(num_windows)
    print(f"Done with {num_windows} windows of {seconds} seconds:")
    results = [
        ["SVM", f"{np.mean(svm_accuracies):.2f}%", f"{np.mean(svm_specificities):.2f}%", f"{np.mean(svm_sensitivities):.2f}%"],
        ["Random Forest", f"{np.mean(rf_accuracies):.2f}%", f"{np.mean(rf_specificities):.2f}%", f"{np.mean(rf_sensitivities):.2f}%"],
        ["XGBoost", f"{np.mean(xgb_accuracies):.2f}%", f"{np.mean(xgb_specificities):.2f}%", f"{np.mean(xgb_sensitivities):.2f}%"],
        ["KNN", f"{np.mean(knn_accuracies):.2f}%", f"{np.mean(knn_specificities):.2f}%", f"{np.mean(knn_sensitivities):.2f}%"]
    ]

    print(tabulate(results, headers=["Model", "Accuracy", "Specificity", "Sensitivity"]))
    print("\n")

In [36]:
# MUY LENTO NO MODULADO
# Path to data and other constants
data_path = '/home/ximo/Escritorio/ProyectoTFG/MusePreprocessed'
subjects = range(1,31)
repetitions = ['1', '2']
minutes = ['1', '2', '3']
experiment_types = ['m', 'l', 'c', 'e']
num_windows_options = [1]

# Setting up leave-one-out cross-validation
loo = LeaveOneOut()

for num_windows in num_windows_options:
    svm_accuracies, rf_accuracies, xgb_accuracies, knn_accuracies = [], [], [], []
    svm_sensitivities, rf_sensitivities, xgb_sensitivities, knn_sensitivities = [], [], [], []
    svm_specificities, rf_specificities, xgb_specificities, knn_specificities = [], [], [], []

    loop_start_time = time.time()

    for train_index, test_index in loo.split(subjects):
        
        train_subjects = [subjects[i] for i in train_index]
        test_subject = subjects[test_index[0]]

        # Load and preprocess training data
        X_train, y_train = [], []
        for subject in train_subjects:
            for repetition in repetitions:
                for exp_type in experiment_types:
                    for minute in minutes:
                        data = load_data(subject, repetition, exp_type, minute)
                        features, labels = extract_features(data, num_windows)
                        X_train.extend(features)
                        y_train.extend(labels)

        X_train = np.array(X_train)
        y_train = np.array(y_train)

        # Load and preprocess test data
        X_test, y_test = [], []
        for repetition in repetitions:
            for exp_type in experiment_types:
                for minute in minutes:
                    data = load_data(test_subject, repetition, exp_type, minute)
                    features, labels = extract_features(data, num_windows)
                    X_test.extend(features)
                    y_test.extend(labels)

        X_test = np.array(X_test)
        y_test = np.array(y_test)

        print("Cargo los datos. \n")

        # Reduce features using SelectKBest
        X_train, X_test = select_features(X_train, y_train, X_test, k=5)

        print("Selecciono las features")

        # Train and Test SVM model
        svm_model = LinearSVC(dual=True, verbose=False)
        svm_model.fit(X_train, y_train)
        svm_predictions = svm_model.predict(X_test)
        svm_accuracy = accuracy_score(y_test, svm_predictions)
        svm_accuracies.append(svm_accuracy)

        print("Salgo SVM. \n")
        
        # Train and Test Random Forest model
        rf_model = RandomForestClassifier(n_estimators=100)
        rf_model.fit(X_train, y_train)
        rf_predictions = rf_model.predict(X_test)
        rf_accuracy = accuracy_score(y_test, rf_predictions)
        rf_accuracies.append(rf_accuracy)

        # Train and Test XGBoost model
        xgb_model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
        xgb_model.fit(X_train, y_train)
        xgb_predictions = xgb_model.predict(X_test)
        xgb_accuracy = accuracy_score(y_test, xgb_predictions)
        xgb_accuracies.append(xgb_accuracy)

        # Train and Test KNN model
        knn_model = KNeighborsClassifier(n_neighbors=4)
        knn_model.fit(X_train, y_train)
        knn_predictions = knn_model.predict(X_test)
        knn_accuracy = accuracy_score(y_test, knn_predictions)
        knn_accuracies.append(knn_accuracy)

        print("Salgo de todos los modelos. \n")

        # We calculate the confusion Matrix
        cm_svm = confusion_matrix(y_test, svm_predictions)
        cm_rf = confusion_matrix(y_test, rf_predictions)
        cm_xgb = confusion_matrix(y_test, xgb_predictions)
        cm_knn = confusion_matrix(y_test, knn_predictions)

        # After calculating the confusion matrix for each model we save specifity and sensibility
        sens_svm, spec_svm = calc_metrics(cm_svm)
        svm_sensitivities.append(sens_svm)
        svm_specificities.append(spec_svm)

        sens_rf, spec_rf = calc_metrics(cm_rf)
        rf_sensitivities.append(sens_rf)
        rf_specificities.append(spec_rf)

        sens_xgb, spec_xgb = calc_metrics(cm_xgb)
        xgb_sensitivities.append(sens_xgb)
        xgb_specificities.append(spec_xgb)

        sens_knn, spec_knn = calc_metrics(cm_knn)
        knn_sensitivities.append(sens_knn)
        knn_specificities.append(spec_knn)

        # Print results after each iteration
        print(f"Results for test subject {test_subject}:")
        print(f"SVM - Accuracy: {svm_accuracy:.2f}%, Sensitivity: {sens_svm:.2f}%, Specificity: {spec_svm:.2f}%")
        print(f"Random Forest - Accuracy: {rf_accuracy:.2f}%, Sensitivity: {sens_rf:.2f}%, Specificity: {spec_rf:.2f}%")
        print(f"XGBoost - Accuracy: {xgb_accuracy:.2f}%, Sensitivity: {sens_xgb:.2f}%, Specificity: {spec_xgb:.2f}%")
        print(f"KNN - Accuracy: {knn_accuracy:.2f}%, Sensitivity: {sens_knn:.2f}%, Specificity: {spec_knn:.2f}%")
        print("\n")

        loop_end_time = time.time()
        elapsed_time = loop_end_time - loop_start_time
        print(f"Elapsed time {elapsed_time:.2f} seconds")


    seconds = 60/int(num_windows)
    print(f"Done with {num_windows} windows of {seconds} seconds:")
    results = [
        ["SVM", f"{np.mean(svm_accuracies):.2f}%", f"{np.mean(svm_specificities):.2f}%", f"{np.mean(svm_sensitivities):.2f}%"],
        ["Random Forest", f"{np.mean(rf_accuracies):.2f}%", f"{np.mean(rf_specificities):.2f}%", f"{np.mean(rf_sensitivities):.2f}%"],
        ["XGBoost", f"{np.mean(xgb_accuracies):.2f}%", f"{np.mean(xgb_specificities):.2f}%", f"{np.mean(xgb_sensitivities):.2f}%"],
        ["KNN", f"{np.mean(knn_accuracies):.2f}%", f"{np.mean(knn_specificities):.2f}%", f"{np.mean(knn_sensitivities):.2f}%"]
    ]

    print(tabulate(results, headers=["Model", "Accuracy", "Specificity", "Sensitivity"]))
    print("\n")


Cargo los datos. 

Selecciono las features
Salgo SVM. 

Salgo de todos los modelos. 

Results for test subject 1:
SVM - Accuracy: 0.25%, Sensitivity: 0.25%, Specificity: 0.75%
Random Forest - Accuracy: 0.42%, Sensitivity: 0.42%, Specificity: 0.81%
XGBoost - Accuracy: 0.42%, Sensitivity: 0.42%, Specificity: 0.81%
KNN - Accuracy: 0.58%, Sensitivity: 0.58%, Specificity: 0.86%


Elapsed time 205.53 seconds
Cargo los datos. 

Selecciono las features
Salgo SVM. 

Salgo de todos los modelos. 

Results for test subject 2:
SVM - Accuracy: 0.21%, Sensitivity: 0.21%, Specificity: 0.74%
Random Forest - Accuracy: 0.25%, Sensitivity: 0.25%, Specificity: 0.75%
XGBoost - Accuracy: 0.25%, Sensitivity: 0.25%, Specificity: 0.75%
KNN - Accuracy: 0.25%, Sensitivity: 0.25%, Specificity: 0.75%


Elapsed time 419.32 seconds


ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.

In [None]:
import numpy as np
import pandas as pd

def extract_and_save_features(subjects, num_windows, data_path):
    all_features = []
    all_labels = []
    for subject in subjects:
        for repetition in repetitions:
            for exp_type in experiment_types:
                for minute in minutes:
                    data = load_data(subject, repetition, exp_type, minute)
                    features, labels = extract_features(data, num_windows)
                    all_features.append(features)
                    all_labels.append(labels)
    all_features = np.array(all_features)
    all_labels = np.array(all_labels)
    filename = f"{data_path}/features_labels_{num_windows}w.npz"
    np.savez(filename, features=all_features, labels=all_labels)
    print(f"Features and labels saved to {filename}")

# Ejemplo de uso:
data_path = '/home/ximo/Escritorio/ProyectoTFG/MusePreprocessed'
subjects = range(1, 31)
num_windows_options = [1]
for num_windows in num_windows_options:
    extract_and_save_features(subjects, num_windows, data_path)


In [3]:
# Path to data and other constants
data_path = '/home/ximo/Escritorio/ProyectoTFG/Muse'
subjects = range(1,31)
repetitions = ['1', '2']
minutes = ['1', '2', '3']
experiment_types = ['m', 'l', 'c', 'e']
num_windows_options = [1]

# Calculate global minimum and maximum values for normalization
min_vals, max_vals = compute_global_min_max(data_path, subjects, repetitions, experiment_types, minutes, excluded_columns)

# Setting up leave-one-out cross-validation
loo = LeaveOneOut()

for num_windows in num_windows_options:
    svm_accuracies, rf_accuracies, xgb_accuracies, knn_accuracies = [], [], [], []
    svm_sensitivities, rf_sensitivities, xgb_sensitivities, knn_sensitivities = [], [], [], []
    svm_specificities, rf_specificities, xgb_specificities, knn_specificities = [], [], [], []

    loop_start_time = time.time()

    for train_index, test_index in loo.split(subjects):
        train_subjects = [subjects[i] for i in train_index]
        test_subject = subjects[test_index[0]]

        # Load and preprocess training data
        X_train, y_train = [], []
        for subject in train_subjects:
            for repetition in repetitions:
                for exp_type in experiment_types:
                    for minute in minutes:
                        data = load_data(subject, repetition, exp_type, minute, min_vals, max_vals)
                        features, labels = extract_features(data, num_windows)
                        X_train.extend(features)
                        y_train.extend(labels)

        X_train = np.array(X_train)
        y_train = np.array(y_train)

        # Load and preprocess test data
        X_test, y_test = [], []
        for repetition in repetitions:
            for exp_type in experiment_types:
                for minute in minutes:
                    data = load_data(test_subject, repetition, exp_type, minute, min_vals, max_vals)
                    features, labels = extract_features(data, num_windows)
                    X_test.extend(features)
                    y_test.extend(labels)

        X_test = np.array(X_test)
        y_test = np.array(y_test)

        # Reduce features using SelectKBest
        X_train, X_test = select_features(X_train, y_train, X_test, k=25)

        # Train and Test SVM model
        svm_model = SVC(kernel='linear', verbose=False)
        svm_model.fit(X_train, y_train)
        svm_predictions = svm_model.predict(X_test)
        svm_accuracy = accuracy_score(y_test, svm_predictions)
        svm_accuracies.append(svm_accuracy)
        
        # Train and Test Random Forest model
        rf_model = RandomForestClassifier(n_estimators=100)
        rf_model.fit(X_train, y_train)
        rf_predictions = rf_model.predict(X_test)
        rf_accuracy = accuracy_score(y_test, rf_predictions)
        rf_accuracies.append(rf_accuracy)

        # Train and Test XGBoost model
        xgb_model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
        xgb_model.fit(X_train, y_train)
        xgb_predictions = xgb_model.predict(X_test)
        xgb_accuracy = accuracy_score(y_test, xgb_predictions)
        xgb_accuracies.append(xgb_accuracy)

        # Train and Test KNN model
        knn_model = KNeighborsClassifier(n_neighbors=4)
        knn_model.fit(X_train, y_train)
        knn_predictions = knn_model.predict(X_test)
        knn_accuracy = accuracy_score(y_test, knn_predictions)
        knn_accuracies.append(knn_accuracy)

        # We calculate the confusion Matrix
        cm_svm = confusion_matrix(y_test, svm_predictions)
        cm_rf = confusion_matrix(y_test, rf_predictions)
        cm_xgb = confusion_matrix(y_test, xgb_predictions)
        cm_knn = confusion_matrix(y_test, knn_predictions)

        # After calculating the confusion matrix for each model we save specifity and sensibility
        sens_svm, spec_svm = calc_metrics(cm_svm)
        svm_sensitivities.append(sens_svm)
        svm_specificities.append(spec_svm)

        sens_rf, spec_rf = calc_metrics(cm_rf)
        rf_sensitivities.append(sens_rf)
        rf_specificities.append(spec_rf)

        sens_xgb, spec_xgb = calc_metrics(cm_xgb)
        xgb_sensitivities.append(sens_xgb)
        xgb_specificities.append(spec_xgb)

        sens_knn, spec_knn = calc_metrics(cm_knn)
        knn_sensitivities.append(sens_knn)
        knn_specificities.append(spec_knn)

        # Print results after each iteration
        print(f"Results for test subject {test_subject}:")
        print(f"SVM - Accuracy: {svm_accuracy:.2f}%, Sensitivity: {sens_svm:.2f}%, Specificity: {spec_svm:.2f}%")
        print(f"Random Forest - Accuracy: {rf_accuracy:.2f}%, Sensitivity: {sens_rf:.2f}%, Specificity: {spec_rf:.2f}%")
        print(f"XGBoost - Accuracy: {xgb_accuracy:.2f}%, Sensitivity: {sens_xgb:.2f}%, Specificity: {spec_xgb:.2f}%")
        print(f"KNN - Accuracy: {knn_accuracy:.2f}%, Sensitivity: {sens_knn:.2f}%, Specificity: {spec_knn:.2f}%")
        print("\n")

        loop_end_time = time.time()
        elapsed_time = loop_end_time - loop_start_time
        print(f"Elapsed time {elapsed_time:.2f} seconds")
        print("\n")
        print("\n")



    seconds = 60/int(num_windows)
    print(f"Done with {num_windows} windows of {seconds} seconds:")
    results = [
        ["SVM", f"{np.mean(svm_accuracies):.2f}%", f"{np.mean(svm_specificities):.2f}%", f"{np.mean(svm_sensitivities):.2f}%"],
        ["Random Forest", f"{np.mean(rf_accuracies):.2f}%", f"{np.mean(rf_specificities):.2f}%", f"{np.mean(rf_sensitivities):.2f}%"],
        ["XGBoost", f"{np.mean(xgb_accuracies):.2f}%", f"{np.mean(xgb_specificities):.2f}%", f"{np.mean(xgb_sensitivities):.2f}%"],
        ["KNN", f"{np.mean(knn_accuracies):.2f}%", f"{np.mean(knn_specificities):.2f}%", f"{np.mean(knn_sensitivities):.2f}%"]
    ]

    print(tabulate(results, headers=["Model", "Accuracy", "Specificity", "Sensitivity"]))
    print("\n")


KeyboardInterrupt: 

In [6]:
import numpy as np
import pandas as pd

def extract_features(data, num_windows):
    window_size = int((60*256) / num_windows)  # Adjusted to dataset specifics
    features, labels = [], []
    for i in range(num_windows):
        window_data = data.iloc[i*window_size : (i+1)*window_size]
        if len(window_data) < window_size:
            raise ValueError("NOT ENOUGH DATA FOR 1 MINUTE.")
        
        # Drop label from the window data if present
        if 'label' in window_data.columns:
            labels.append(window_data['label'].iloc[0])
            feature_data = window_data.drop(columns='label')
        else:
            feature_data = window_data

        # Calculate features for each channel
        means = feature_data.mean()

        # Combine all features into a single array
        combined_features = np.concatenate([
            means.values,
        ])

        # Append combined features and labels
        features.append(combined_features)

    features, labels = np.array(features), np.array(labels)
    return features, labels

3 FEATURES BIEN:

In [2]:
import numpy as np
import pandas as pd

def calculate_total_band_energy(signal):
    """Calculates the total energy of the frequency band of a signal using FFT."""
    fft_values = np.fft.fft(signal)
    magnitude_squared = np.abs(fft_values) ** 2
    total_energy = np.sum(magnitude_squared) / len(signal)
    return total_energy

def max_power(signal):
    """Calculates the maximum power of a signal using FFT."""
    fft_values = np.fft.fft(signal)
    power_spectrum = np.abs(fft_values) ** 2 / len(signal)
    return np.max(power_spectrum)

def shannon_entropy(signal):
    """Calculates Shannon entropy of a signal."""
    value, counts = np.unique(signal, return_counts=True)
    probabilities = counts / counts.sum()
    entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))  # To avoid log(0)
    return entropy

def calculate_peak_to_peak(signal):
    """Calculates the peak-to-peak value of a signal."""
    peak_to_peak_value = signal.max() - signal.min()
    return peak_to_peak_value

def extract_features(data, num_windows):
    window_size = int((60*256) / num_windows)  # Adjusted to dataset specifics
    features, labels = [], []
    for i in range(num_windows):
        window_data = data.iloc[i*window_size : (i+1)*window_size]
        if len(window_data) < window_size:
            raise ValueError("NOT ENOUGH DATA FOR 1 MINUTE.")
        
        # Drop label from the window data if present
        if 'label' in window_data.columns:
            labels.append(window_data['label'].iloc[0])
            feature_data = window_data.drop(columns='label')
        else:
            feature_data = window_data

        # Calculate features for each channel
        means = feature_data.mean()
        max_powers = feature_data.apply(max_power)
        entropies = feature_data.apply(shannon_entropy)
        band_energies = feature_data.apply(calculate_total_band_energy)
        peak_to_peak_values = feature_data.apply(calculate_peak_to_peak)

        # Combine all features into a single array
        combined_features = np.concatenate([
            means.values,
            max_powers.values,
            entropies.values,
            band_energies.values,
            peak_to_peak_values.values
        ])

        # Append combined features and labels
        features.append(combined_features)

    features, labels = np.array(features), np.array(labels)
    return features, labels

In [None]:
import numpy as np
import pandas as pd
from scipy.stats import gaussian_kde
from scipy.integrate import quad

def differential_entropy(signal):
    """Calcula la entropía diferencial de una señal utilizando la estimación de densidad kernel."""
    kde = gaussian_kde(signal)
    # Definimos los límites para la integración basados en los datos
    lower_limit, upper_limit = signal.min(), signal.max()
    # Calculamos la entropía diferencial
    entropy = -quad(lambda x: kde(x) * np.log(kde(x) + 1e-10), lower_limit, upper_limit)[0]
    return entropy

def extract_features(data, num_windows):
    window_size = int((60*256) / num_windows)  # Ajustado a las especificaciones del conjunto de datos
    features, labels = [], []
    for i in range(num_windows):
        window_data = data.iloc[i*window_size : (i+1)*window_size]
        if len(window_data) < window_size:
            raise ValueError("NOT ENOUGH DATA FOR 1 MINUTE.")
        
        # Drop label from the window data if present
        if 'label' in window_data.columns:
            labels.append(window_data['label'].iloc[0])
            feature_data = window_data.drop(columns='label')
        else:
            feature_data = window_data
        
        # Calculate differential entropy for each channel
        diff_entropies = feature_data.apply(differential_entropy)

        # Store the calculated differential entropies
        features.append(diff_entropies.values)

    features, labels = np.array(features), np.array(labels)
    return features, labels
