In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pickle
import os
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, roc_curve, auc, log_loss
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier

In [None]:
#show keys of the first subject to explore dataset shape
data_path = "/kaggle/input/wesad-full-dataset/WESAD"
wesad_files = []
for root, dirs, files in os.walk(data_path):
    for file in files:
        if file.endswith(".pkl"):
            wesad_files.append(os.path.join(root, file))
if wesad_files:
    sample_file = wesad_files[0]
    with open(sample_file, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    print("Keys in the dataset:", data.keys())
else:
    print("No .pkl files found in the directory.")

In [None]:
#show more about shape
print("Signal Data Keys:", data['signal'].keys())
for key in data['signal']:
    print(f"{key}: {np.array(data['signal'][key]).shape}") 
print("Unique Labels:", np.unique(data['label']))

In [None]:
#which from chest and which from wrist
print("Chest Data Keys:", data['signal']['chest'].keys())
print("Wrist Data Keys:", data['signal']['wrist'].keys())
# show shape of data inside 
for key in data['signal']['chest']:
    print(f"Chest - {key}: {np.array(data['signal']['chest'][key]).shape}")

for key in data['signal']['wrist']:
    print(f"Wrist - {key}: {np.array(data['signal']['wrist'][key]).shape}")
#we will be dealing with ECG signal

In [None]:
#show shape of 3000 points of the ECG signal
plt.figure(figsize=(15, 5))
plt.plot(data['signal']['chest']['ECG'][:3000])
plt.title("Chest ECG Signal")
plt.xlabel("Time")
plt.ylabel("Amplitude")

In [None]:
#functions to remove outliers and normalize the signal (MIN-MAX)
def remove_outliers(data):
    Q1 = np.percentile(data, 25, axis=0)
    Q3 = np.percentile(data, 75, axis=0)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    mask = (data >= lower_bound) & (data <= upper_bound)
    data_clean = np.where(mask, data, np.mean(data, axis=0))
    return data_clean
def normalize_signal(signal):
    return (signal - np.min(signal)) / (np.max(signal) - np.min(signal))
data_path = "/kaggle/input/wesad-full-dataset/WESAD"
#collect all files
wesad_files = []
for root, dirs, files in os.walk(data_path):
    for file in files:
        if file.endswith(".pkl"):
            wesad_files.append(os.path.join(root, file))

ecg_all = []
labels_all = []
valid_labels = [1, 2, 3, 4]  # since 0 undefined , 5/6/7 will not be considered

# walk through subject to collect pkl files and process them
for file in wesad_files:
    with open(file, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    if 'chest' not in data['signal'] or 'ECG' not in data['signal']['chest']:
       continue 
    labels = data['label']
    ecg = data['signal']['chest']['ECG']
    # filter unimportant labels and ecg signals
    valid_indices = np.isin(labels, valid_labels)
    filtered_labels = labels[valid_indices]
    filtered_ecg = ecg[valid_indices]
    # Signal processing pipeline
    filtered_ecg = remove_outliers(filtered_ecg)
    filtered_ecg = normalize_signal(filtered_ecg)
    # binary conversion of labels
    binary_labels = np.where(filtered_labels == 2, 1, 0)

    # add at one loop and repeat 
    ecg_all.append(filtered_ecg)
    labels_all.append(binary_labels)

#  after loop collect all
ecg_all = np.concatenate(ecg_all, axis=0)
labels_all = np.concatenate(labels_all, axis=0)
print("Final ECG shape:", ecg_all.shape)
print("Final Labels shape:", labels_all.shape)

In [None]:
#comare between original labels and binary distributions
all_original_labels = []
print("Collecting original labels from all subjects...")
for file in tqdm(wesad_files, desc="Processing Files (Original Labels)"):
    with open(file, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    
    all_original_labels.append(data['label'])

all_original_labels = np.concatenate(all_original_labels)
unique_labels_orig, counts_orig = np.unique(all_original_labels, return_counts=True)

unique_labels_bin, counts_bin = np.unique(labels_all, return_counts=True)

# draw
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# original labels distribution as first graph
sns.barplot(x=unique_labels_orig, y=counts_orig, palette="coolwarm", ax=axes[0])
axes[0].set_title("Original Label Distribution (0 to 7)")
axes[0].set_xlabel("Original Labels")
axes[0].set_ylabel("Count")

# binary labels distribution as second graph
sns.barplot(x=unique_labels_bin, y=counts_bin, palette="coolwarm", ax=axes[1])
axes[1].set_title("Binary Label Distribution (0 = No Stress, 1 = Stress)")
axes[1].set_xlabel("Binary Labels")
axes[1].set_ylabel("Count")
axes[1].set_xticks([0, 1])
axes[1].set_xticklabels(["No Stress", "Stress"])

plt.tight_layout()
plt.show()

In [None]:
ecg_data = ecg_all
binary_labels = labels_all
# confirm
print("Binary Labels (1 = Stressed, 0 = Not Stressed):")
print(binary_labels)
print("Filtered ECG Shape:", ecg_data.shape)
print("Filtered Labels Shape:", binary_labels.shape)
#after
unique_labels, counts = np.unique(binary_labels, return_counts=True)
print("Count of each class after filtering:", dict(zip(unique_labels, counts)))

In [None]:
window_size = 7000  # 700 (samiling rate) x 10 (s)
stride = window_size  # no overlapping 
# Divide ECG data into windows without overlap using last timestamp method
#def create_windows(ecg_data, labels, window_size, stride):
  #  windows = []
  #  window_labels = []
   # for i in range(0, len(ecg_data) - window_size + 1, stride):
   #     windows.append(ecg_data[i:i+window_size])
   #     window_labels.append(labels[i + window_size - 1])
  #  return np.array(windows), np.array(window_labels)

# Divide ECG data into windows without overlap using majority voting method
def create_windows_majority(ecg_data, labels, window_size, stride):
    windows = []
    window_labels = []

    for i in range(0, len(ecg_data) - window_size + 1, stride):
        window = ecg_data[i:i+window_size]
        label_window = labels[i:i+window_size]

        majority_label = int(np.round(np.mean(label_window))) 

        windows.append(window)
        window_labels.append(majority_label)

    return np.array(windows), np.array(window_labels)
ecg_windows, window_labels = create_windows_majority(ecg_data, binary_labels, window_size, stride)   
#ecg_windows, window_labels = create_windows(ecg_data, binary_labels, window_size, stride)
# visualize first 3 windows
plt.figure(figsize=(15, 5))
for i in range(3):
    plt.subplot(3, 1, i+1)
    plt.plot(ecg_windows[i])
    #plt.plot(emg_windows[i])
    #plt.plot(eda_windows[i])
    plt.title(f"ECG Window {i+1}")
    #plt.title(f"EMG Window {i+1}")
    #plt.title(f"EDA Window {i+1}")
plt.show()

print("Shape of windows:", ecg_windows.shape)
print("Shape of window labels:", window_labels.shape)

In [None]:
# Function to extract features from each window
def extract_features(windows):
    features = []
    for window in windows:
        # Extracting statistical features
        mean = np.mean(window)
        std = np.std(window)
        max_val = np.max(window)
        min_val = np.min(window)
        # Adding features to the list
        features.append([mean, std, max_val, min_val])
    
    return np.array(features)

# Extract features from the windows
features = extract_features(ecg_windows)

# Standardize the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

# Visualizing some of the features
plt.figure(figsize=(10, 5))
plt.plot(features_scaled[:50, 0], label="Mean")
plt.plot(features_scaled[:50, 1], label="Std")
plt.title("Feature Extraction Visualization (Mean and Std) for first 50 windows")
plt.xlabel("Window Index")
plt.ylabel("Feature Value")
plt.legend()
plt.show()

print("Shape of the feature matrix:", features_scaled.shape)
#وريني عدد الفيتشرز ف اول ويندو
num_features = features_scaled.shape[1]
print("num of features taken in first window:", num_features)
total_features = features_scaled.shape[0] * features_scaled.shape[1]
print("total num of features in all:", total_features)

In [None]:
# split features_scaled and window_labels
X_train, X_test, y_train, y_test = train_test_split(
    features_scaled,
    window_labels,
    test_size=0.2,
    random_state=42,
    shuffle=True
)
# balance training data only
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# 3. visualize new training distribution
unique_labels_resampled, counts_resampled = np.unique(y_train_resampled, return_counts=True)
plt.figure(figsize=(8, 5))
sns.barplot(x=unique_labels_resampled, y=counts_resampled, palette="coolwarm")
plt.title("Balanced Training Data (SMOTE)")
plt.xlabel("Label (0 = No Stress, 1 = Stress)")
plt.ylabel("Count")
plt.xticks(ticks=[0, 1], labels=["No Stress", "Stress"])
plt.show()
# models
models = {
    "RandomForest": RandomForestClassifier(n_estimators=100, max_depth=10, min_samples_split=5, min_samples_leaf=2, random_state=42),
    "KNN": KNeighborsClassifier(n_neighbors=50),
    "XGBoost": XGBClassifier(n_estimators=100, learning_rate=0.05, max_depth=5, use_label_encoder=False, eval_metric='logloss', random_state=42)
}

results = {}

#model tain/test
for name, model in models.items():
    print(f"\n===== {name} =====")
    
    model.fit(X_train_resampled, y_train_resampled)
    
    y_pred = model.predict(X_test)
    y_score = model.predict_proba(X_test)[:, 1]
    
    # evaluations
    acc_train = model.score(X_train_resampled, y_train_resampled)
    acc_test = accuracy_score(y_test, y_pred)
    auc_score = auc(*roc_curve(y_test, y_score)[:2])
    train_loss = log_loss(y_train_resampled, model.predict_proba(X_train_resampled))
    test_loss = log_loss(y_test, model.predict_proba(X_test))
    
    print("Classification Report:")
    print(classification_report(y_test, y_pred))
    print(f"Training Accuracy: {acc_train:.2f}")
    print(f"Test Accuracy: {acc_test:.2f}")
    print(f"AUC: {auc_score:.2f}")
    print(f"Log Loss (Train): {train_loss:.4f}")
    print(f"Log Loss (Test): {test_loss:.4f}")
    
    results[name] = {
        "train_acc": acc_train,
        "test_acc": acc_test,
        "auc": auc_score,
        "log_loss_train": train_loss,
        "log_loss_test": test_loss,
        "conf_matrix": confusion_matrix(y_test, y_pred)
    }
    
    #ROC Curve
    fpr, tpr, _ = roc_curve(y_test, y_score)
    plt.plot(fpr, tpr, lw=2, label=f'{name} (AUC = {auc_score:.2f})')

plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve Comparison')
plt.legend(loc='lower right')
plt.grid(True)
plt.tight_layout()
plt.show()

#Confusion Matrices
for name in models:
    plt.figure(figsize=(6, 4))
    sns.heatmap(results[name]["conf_matrix"], annot=True, fmt='d', cmap="Blues",
                xticklabels=["No Stress", "Stress"], yticklabels=["No Stress", "Stress"])
    plt.title(f"{name} Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.show()