## Imports (common)

In [11]:
import os
import numpy as np
import gc
import h5py
import scipy.io
import scipy.signal as sgl
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.feature_selection import VarianceThreshold
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_auc_score
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV, cross_val_score
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, LSTM, GRU
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from scipy.stats import skew, kurtosis
from scipy.signal import welch
import neurokit2 as nk
from imblearn.over_sampling import SMOTE

## Data Loading & Preprocessing

In [14]:

# --- Data Loading Function ---
def load_patient_preprocessed_data(patient_number):
    base_dir = r"C:\Users\ferri\Downloads\PoliTO\Tesi\DSs\Emotion-Stress\AMIGOS"
    file_path = os.path.join(
        base_dir, "Data preprocessed",
        f"Data_Preprocessed_P{patient_number:02d}",
        f"Data_Preprocessed_P{patient_number:02d}.mat"
    )
    data = scipy.io.loadmat(file_path)
    return data

# --- Preprocessing functions for pipeline ---
def process_trial_signal(signal, target_length=None, fs=512):
    """
    Convert a trial's raw signal into a 2D array [channels, time].
    If target_length is None, we keep the full length (then you'll pad later).
    Preprocessing steps (downsampling, filtering, baseline removal) are applied.
    """
    # Convert to float32.
    signal = np.array(signal, dtype=np.float32)
    
    # Check if signal is empty.
    if signal.size == 0:
        return np.empty((0,0), dtype=np.float32)
    
    # If the signal is 1D, reshape to (1, length).
    if signal.ndim == 1:
        signal = signal[None, :]
    
    # Downsampling parameters.
    N = 4
    lowcut, highcut = 1.0, 45.0
    desired_fs = 128
    down_factor = fs // desired_fs
    
    # 1) Downsample each channel.
    downsampled = []
    for ch_data in signal:
        # If a channel is empty, skip it.
        if ch_data.size == 0:
            continue
        ch_data_down = ch_data[::down_factor]
        downsampled.append(ch_data_down)
        
    # If no channels had data, return an empty array.
    if not downsampled:
        return np.empty((0,0), dtype=np.float32)
        
    # Stack downsampled channels.
    signal = np.vstack([ch[None, :] for ch in downsampled])
    
    # 2) Bandpass filter design.
    nyquist = 0.5 * desired_fs
    b, a = sgl.butter(N=4, Wn=[lowcut/nyquist, highcut/nyquist], btype='band')
    
    # Calculate the minimum length required by filtfilt.
    min_len = 3 * (max(len(a), len(b)) - 1)
    
    # Filter each channel; if too short, skip filtering.
    filtered = []
    for ch_data in signal:
        if len(ch_data) < min_len:
            ch_data_filt = ch_data  # Fallback: leave unfiltered.
        else:
            ch_data_filt = sgl.filtfilt(b, a, ch_data)
        filtered.append(ch_data_filt)
    signal = np.vstack([ch[None, :] for ch in filtered])
    
    # 3) Baseline removal (subtract mean from each channel).
    baseline_removed = []
    for ch_data in signal:
        ch_data_bs = ch_data - np.mean(ch_data)
        baseline_removed.append(ch_data_bs)
    signal = np.vstack([ch[None, :] for ch in baseline_removed])
    
    # 4) Padding/Truncation if target_length is provided.
    if target_length is not None:
        processed = []
        for ch_data in signal:
            ch_len = len(ch_data)
            if ch_len == 0:
                proc = np.zeros(target_length, dtype=np.float32)
            elif ch_len < target_length:
                pad_width = target_length - ch_len
                proc = np.pad(ch_data, (0, pad_width), mode='edge')
            else:
                proc = ch_data[:target_length]
            processed.append(proc.astype(np.float32))
        signal = np.vstack([p[None, :] for p in processed])
    
    return signal.astype(np.float32)

def split_into_modalities(signal):
    # If the signal is 1D, assume it represents a single modality (e.g., ECG).
    if signal.ndim == 1:
        return {"ecg": signal}
    else:
        # If multi-channel, split into ECG, GSR, and EEG as desired.
        ecg = signal[0, :]
        gsr = signal[1, :]
        eeg = signal[2, :] 
        return {"ecg": ecg, "gsr": gsr, "eeg": eeg}

def discretize_label(label):
    """
    Convert a label [valence, arousal] into a descriptive class.
    If the flattened label has 2 elements, use them directly.
    If it has 3 or more, use the second and third elements.
    """
    flat_label = np.array(label).flatten()  # Ensure label is 1D.
    if flat_label.size == 2:
        valence, arousal = flat_label
    elif flat_label.size >= 3:
        valence, arousal = flat_label[1], flat_label[2]
    else:
        return "Unknown"
    
    if valence < 0 and arousal < 0:
        return "Low valence, Low arousal"
    elif valence < 0 and arousal >= 0:
        return "Low valence, High arousal"
    elif valence >= 0 and arousal < 0:
        return "High valence, Low arousal"
    else:
        return "High valence, High arousal"

# --- Feature Extraction Functions ---
def extract_features(signals, fs=128):
    """
    Extract features for multiple signals (ECG, GSR, EEG) from a dictionary.
    If advanced processing (e.g., HRV from ECG) fails, falls back to basic statistics.
    """
    feat_list = []

    # ---------- ECG Features ----------
    if 'ecg' in signals:
        ecg_signal = np.array(signals['ecg']).flatten()
        if len(ecg_signal) <= 18:
            ecg_feats = [0.0] * 10  # Not enough data for advanced features.
        else:
            try:
                ecg_cleaned = nk.ecg_clean(ecg_signal, sampling_rate=fs)
                _, rpeaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=fs)
                # Check if any R-peaks were detected
                if len(rpeaks.get("ECG_R_Peaks", [])) == 0:
                    raise ValueError("No R-peaks detected.")
                hrv = nk.hrv(rpeaks, sampling_rate=fs, show=False)
                feature_names = [
                    "RMSSD", "SDNN", "pNN50", "pNN20",
                    "LF/HF", "HF", "LF", "VLF", "HRV_TI", "SDSD"
                ]
                ecg_feats = []
                for name in feature_names:
                    if name in hrv.columns and not np.isnan(hrv[name].values[0]):
                        ecg_feats.append(hrv[name].values[0])
                    else:
                        ecg_feats.append(0.0)
            except Exception as e:
                # Fallback: Compute basic statistics if advanced features fail.
                basic_stats = [
                    np.mean(ecg_signal),
                    np.std(ecg_signal),
                    np.min(ecg_signal),
                    np.max(ecg_signal),
                    np.median(ecg_signal)
                ]
                # Pad to reach length 10.
                ecg_feats = basic_stats + [0.0] * (10 - len(basic_stats))
        feat_list.append(np.array(ecg_feats))

    # ---------- GSR Features ----------
    if 'gsr' in signals:
        gsr_signal = np.array(signals['gsr']).flatten()
        if len(gsr_signal) > 2:
            try:
                eda_cleaned = nk.eda_clean(gsr_signal, sampling_rate=fs)
                eda_peaks, _ = nk.eda_peaks(eda_cleaned, sampling_rate=fs)
                num_scr_peaks = eda_peaks.get("SCR_Peaks", np.array([0])).sum()
            except Exception:
                num_scr_peaks = 0.0
            gsr_feats = [
                np.mean(gsr_signal),
                np.std(gsr_signal),
                np.min(gsr_signal),
                np.max(gsr_signal),
                kurtosis(gsr_signal),
                skew(gsr_signal),
                num_scr_peaks
            ]
        else:
            gsr_feats = [0.0] * 7
        feat_list.append(np.array(gsr_feats))

    # ---------- EEG Features ----------
    if 'eeg' in signals:
        eeg_data = np.array(signals['eeg'])
        if eeg_data.ndim == 1:
            eeg_data = eeg_data[None, :]  # Ensure 2D shape.
        all_channels_feats = []
        for ch in range(eeg_data.shape[0]):
            channel_signal = eeg_data[ch, :]
            if len(channel_signal) < 2:
                ch_feats = [0.0] * 6
            else:
                activity = np.var(channel_signal)
                mobility = np.std(np.diff(channel_signal)) / (np.std(channel_signal) + 1e-8)
                diff_signal = np.diff(channel_signal)
                complexity = (np.std(np.diff(diff_signal)) / (np.std(diff_signal) + 1e-8)) / (mobility + 1e-8)
                freqs, psd = welch(channel_signal, fs=fs, nperseg=min(256, len(channel_signal)))
                def bandpower(f, pxx, fmin, fmax):
                    idx = np.logical_and(f >= fmin, f <= fmax)
                    # Use trapezoid integration as recommended.
                    return np.trapezoid(pxx[idx], x=f[idx])
                alpha = bandpower(freqs, psd, 8, 14)
                beta  = bandpower(freqs, psd, 14, 30)
                gamma = bandpower(freqs, psd, 30, 50)
                ch_feats = [activity, mobility, complexity, alpha, beta, gamma]
            all_channels_feats.append(ch_feats)
        # Average across channels.
        eeg_feats = np.mean(all_channels_feats, axis=0)
        feat_list.append(eeg_feats)

    # ---------- Combine all features ----------
    if len(feat_list) == 0:
        return np.zeros(10)
    return np.concatenate(feat_list)

def build_dataset(joined_data, labels_array, target_length=None):
    X_list = []
    y_list = []
    n_trials = joined_data.shape[1]

    for i in range(n_trials):
        trial_data = joined_data[0, i]
        signal = process_trial_signal(trial_data, target_length)
        if signal.size == 0 or signal.shape[0] == 0:
            print(f"Warning: Trial {i} has an empty signal. Skipping trial.")
            continue

        signals_dict = split_into_modalities(signal)
        feats = extract_features(signals_dict, fs=128)
        
        lbl = np.array(labels_array[0, i]).squeeze()
        if lbl.size < 3:
            print(f"Warning: Trial {i} does not have enough label data. Skipping trial.")
            continue
        selected_label = lbl[1:3]  # use only the second and third columns
        discrete_label = discretize_label(selected_label)

        X_list.append(feats)
        y_list.append(discrete_label)

    if len(X_list) == 0:
        return None, None

    X_array = np.vstack(X_list)
    y_array = np.array(y_list)
    return X_array, y_array

def build_patient_data(joined_data, label_array):
    X_list = []
    y_list = []
    n_trials = joined_data.shape[1]
    for i in range(n_trials):
        trial_data = joined_data[0, i]
        trial_data = np.array(trial_data, dtype=float).squeeze()
        features = extract_features(trial_data)
        
        lbl = np.array(label_array[0, i]).squeeze()
        if lbl.size < 3:
            print(f"Warning: Trial {i} does not have enough label data. Skipping trial.")
            continue
        # Select only the second and third columns (i.e. indices 1 and 2)
        selected_label = lbl[1:3]
        discrete_label = discretize_label(selected_label)

        
        X_list.append(features)
        y_list.append(discrete_label)
    if len(X_list) == 0:
        return None, None
    return np.vstack(X_list), np.array(y_list)

def load_all_patients_data(num_patients=40):
    X_list = []
    y_list = []
    for patient in range(1, num_patients+1):
        print(f"Loading patient {patient}")
        data = load_patient_preprocessed_data(patient)
        joined_data = data['joined_data']
        labels_array = data['labels_ext_annotation']
        X_patient, y_patient = build_patient_data(joined_data, labels_array)
        if X_patient is not None and y_patient is not None:
            X_list.append(X_patient)
            y_list.append(y_patient)
    if len(X_list) == 0:
        raise ValueError("No patient data loaded.")
    X_all = np.vstack(X_list)
    y_all = np.concatenate(y_list)
    return X_all, y_all

# Load raw signals for CNN/LSTM/GRU
def load_all_patients_raw_signal(num_patients=40, target_length=None):
    X_list, y_list = [], []
    for patient in range(1, num_patients + 1):
        data = load_patient_preprocessed_data(patient)
        joined_data = data['joined_data']
        labels_array = data['labels_ext_annotation']
        
        X_patient, y_patient = build_dataset(joined_data, labels_array, target_length=target_length)
        
        if X_patient is not None:
            X_list.append(X_patient)
            y_list.append(y_patient)

    if not X_list:
        raise ValueError("No data loaded!")

    X_all = np.vstack(X_list)
    y_all = np.concatenate(y_list)

    return X_all, y_all

def pad_trials(trials, pad_mode='constant', constant_values=0):
    """
    Given a list of 2D arrays (each with shape (channels, time)), pad them so that all have the same shape.
    Both channel and time dimensions are padded using constant values.
    """
    # Determine maximum dimensions among all trials.
    max_channels = max(trial.shape[0] for trial in trials)
    max_time = max(trial.shape[1] for trial in trials)
    
    padded_trials = []
    for trial in trials:
        ch, t = trial.shape
        # Pad channels if needed.
        if ch < max_channels:
            trial = np.pad(trial, ((0, max_channels - ch), (0, 0)), mode=pad_mode, constant_values=constant_values)
        # Pad time dimension if needed.
        if t < max_time:
            trial = np.pad(trial, ((0, 0), (0, max_time - t)), mode=pad_mode, constant_values=constant_values)
        elif t > max_time:
            trial = trial[:, :max_time]
        padded_trials.append(trial)
    return np.stack(padded_trials, axis=0)

def load_all_patients_raw_signal_deep_chunked(num_patients=40, target_length=None):
    all_X = []
    all_y = []
    for patient in range(1, num_patients+1):
        print(f"Processing patient {patient}...")
        data = load_patient_preprocessed_data(patient)
        joined_data = data['joined_data']
        labels_array = data['labels_ext_annotation']
        n_trials = joined_data.shape[1]
        for i in range(n_trials):
            # Process label first.
            lbl = np.array(labels_array[0, i]).squeeze()
            if lbl.size == 0:
                print(f"Warning: Patient {patient} Trial {i} has empty label. Skipping trial.")
                continue
            if lbl.ndim == 2:
                lbl_processed = np.mean(lbl, axis=0)
            elif lbl.ndim == 1:
                lbl_processed = lbl
            else:
                lbl_processed = lbl.flatten()[0]
            discrete_label = discretize_label(lbl_processed)
            
            trial_data = joined_data[0, i]
            # Process trial signal to a 2D array with full length.
            signal = process_trial_signal(trial_data, target_length).astype(np.float32)
            all_X.append(signal)
            all_y.append(discrete_label)
        
        del data, joined_data, labels_array
        gc.collect()
    
    if len(all_X) == 0:
        raise ValueError("No patient data loaded.")
    
    if target_length is None:
        X_all = pad_trials(all_X, pad_mode='constant', constant_values=0)
    else:
        X_all = np.stack(all_X, axis=0)
    
    y_all = np.array(all_y)
    return X_all, y_all

## Load and Prepare Data

In [15]:
# Load raw signals
X_signal, y_signal_desc = load_all_patients_raw_signal(num_patients=40, target_length=None)

# Option: Use global normalization instead of per-sample normalization
scaler_global = StandardScaler().fit(X_signal)
X_signal_norm = scaler_global.transform(X_signal)

# Continue with label encoding and train/test split as before
unique_labels = np.unique(y_signal_desc)
label_to_int = {label: idx for idx, label in enumerate(unique_labels)}
y_int_signal = np.array([label_to_int[label] for label in y_signal_desc])
y_cat_signal = to_categorical(y_int_signal)

X_train, X_test, y_train, y_test, y_int_train, y_int_test = train_test_split(
    X_signal_norm, y_cat_signal, y_int_signal,
    test_size=0.2, random_state=42, stratify=y_int_signal
)




  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))




  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))




  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))




  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))




  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))
  warn(
  eda_signal = pd.DataFrame.pad(pd.Series(eda_signal))


## Run Statistical Feature Extraction + ML classifiers

In [19]:
# Use the features and labels from the "Load and Prepare Data" cell
X_stat = X_signal       # Already computed features from load_all_patients_raw_signal
y_stat = y_signal_desc  # Corresponding labels

# Optionally encode labels if they are not already integers
unique_labels = np.unique(y_stat)
label_to_int = {label: idx for idx, label in enumerate(unique_labels)}
y_int_stat = np.array([label_to_int[label] for label in y_stat])

# Split the statistical features dataset into training and testing sets
X_train_stat, X_test_stat, y_train_stat, y_test_stat, y_int_train, y_int_test = train_test_split(
    X_stat, y_stat, y_int_stat,
    test_size=0.2, random_state=42, stratify=y_int_stat
)

# --- Preprocessing for Statistical Features ---
# Imputation
imputer = SimpleImputer(strategy='mean').fit(X_train_stat)
X_train_stat_imputed = imputer.transform(X_train_stat)
X_test_stat_imputed = imputer.transform(X_test_stat)

# Global normalization
scaler_global = StandardScaler().fit(X_train_stat_imputed)
X_train_global = scaler_global.transform(X_train_stat_imputed)
X_test_global = scaler_global.transform(X_test_stat_imputed)

# Apply PCA to reduce dimensionality while preserving 95% of variance
pca = PCA(n_components=0.95)
X_train_pca = pca.fit_transform(X_train_global)
X_test_pca = pca.transform(X_test_global)

# Apply SMOTE to balance the training data
sm = SMOTE(random_state=42)
X_train_bal, y_train_bal = sm.fit_resample(X_train_pca, y_int_train)

# --- Evaluate Multiple ML Classifiers ---
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score, confusion_matrix

classifiers = {
    "Logistic Regression": LogisticRegression(max_iter=1000, class_weight='balanced'),
    "SVM": SVC(probability=True, class_weight='balanced'),
    "KNN": KNeighborsClassifier(),
    "Decision Tree": DecisionTreeClassifier(class_weight='balanced'),
    "Random Forest": RandomForestClassifier(n_estimators=100, class_weight='balanced'),
    "Gradient Boosting": GradientBoostingClassifier(),
    "Extra Trees": ExtraTreesClassifier(n_estimators=100, class_weight='balanced')
}

results = {}
print("\nEvaluating classifiers with 5-fold cross-validation:")
for clf_name, clf in classifiers.items():
    try:
        cv_scores = cross_val_score(clf, X_train_bal, y_train_bal, cv=5, scoring='balanced_accuracy')
        results[clf_name] = np.mean(cv_scores)
        print(f"{clf_name}: Mean Balanced Accuracy = {np.mean(cv_scores):.3f}")
    except Exception as e:
        print(f"{clf_name} encountered an error during cross-validation: {e}")

if results:
    best_clf_name = max(results, key=results.get)
    print("\nBest classifier based on cross-validation:", best_clf_name)
    
    best_clf = classifiers[best_clf_name]
    best_clf.fit(X_train_bal, y_train_bal)
    predictions = best_clf.predict(X_test_pca)
    acc = accuracy_score(y_int_test, predictions)
    print(f"\nFinal {best_clf_name} Accuracy on Test Set: {acc:.3f}")
    print("Confusion Matrix:")
    print(confusion_matrix(y_int_test, predictions))
else:
    print("No classifier could be evaluated successfully.")



Evaluating classifiers with 5-fold cross-validation:
Logistic Regression: Mean Balanced Accuracy = 0.345
SVM: Mean Balanced Accuracy = 0.521
KNN: Mean Balanced Accuracy = 0.736
Decision Tree: Mean Balanced Accuracy = 0.724
Random Forest: Mean Balanced Accuracy = 0.786
Gradient Boosting: Mean Balanced Accuracy = 0.712
Extra Trees: Mean Balanced Accuracy = 0.813

Best classifier based on cross-validation: Extra Trees

Final Extra Trees Accuracy on Test Set: 0.480
Confusion Matrix:
[[ 2  4  1  9]
 [ 1  3  0 16]
 [ 0  0  1  2]
 [20 17  7 65]]
