<a href="https://colab.research.google.com/github/pechaj/ml-project/blob/main/ml_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python
# coding: utf-8

# # Cognitive Load Classification from Physiological Signals
# This notebook demonstrates processing and classification of cognitive load using physiological signals (ECG and EDA).

In [None]:
%pip install neurokit2 matplotlib numpy pandas scikit-learn torch tqdm ipywidgets ipympl plotly anywidget

In [None]:
# from google.colab import output
# output.enable_custom_widget_manager()

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, f1_score, balanced_accuracy_score

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## Define Dataset and Data Processing Functions

In [None]:
# Custom dataset class
class CognitiveLoadDataset(Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def load_data_from_folders(base_dir: str, balance_classes=True):
    """
    Loads ECG and EDA data from specified directory structure.
    Implements window segmentation with 50% overlap.

    Args:
        base_dir: Directory containing High_load and Low_load folders
        balance_classes: Whether to compute sample weights for class balancing

    Returns:
        X: Feature array
        y: Labels array
        sample_weights: Optional weights for balanced sampling
    """
    X, y = [], []
    window_size = 256 * 20  # ~20 seconds at 256 Hz
    step_size = window_size // 2  # 50% overlap

    class_samples = {0: [], 1: []}  # For storing samples by class

    for label, class_name in enumerate(["Low_load", "High_load"]):
        class_dir = os.path.join(base_dir, class_name)
        recordings = {}

        for file in os.listdir(class_dir):
            parts = file.rsplit('_', 2)
            if len(parts) != 3:
                continue  # Skip unexpected filenames
            subject_id, rec_num, signal_type = parts
            subject_key = f"{subject_id}_{rec_num}"
            signal_type = signal_type.replace('.csv', '').lower()

            if subject_key not in recordings:
                recordings[subject_key] = {}

            file_path = os.path.join(class_dir, file)
            recordings[subject_key][signal_type] = pd.read_csv(file_path)

        for subject_key, signals in recordings.items():
            # if 'ecg' in signals: # Only ECG
            if 'ecg' in signals and 'eda' in signals: # ECG and EDA
                ecg = signals['ecg']
                min_len = len(ecg)
                ecg = ecg[:min_len]
                eda = signals['eda']

                min_len = min(len(ecg), len(eda))
                ecg, eda = ecg[:min_len], eda[:min_len]


                for start in range(0, min_len - window_size + 1, step_size):
                    ecg_window = ecg[start:start + window_size]
                    ## Uncomment if only ECG channel
                    # combined_signal = ecg_window

                    eda_window = eda[start:start + window_size]
                    combined_signal = pd.concat([ecg_window, eda_window], axis=1)
                    # Store samples by class for balancing
                    class_samples[label].append(combined_signal)

    # Combine data
    for label in [0, 1]:
        for sample in class_samples[label]:
            X.append(sample)
            y.append(label)

    # Print original class distribution
    class_0_size = y.count(0)
    class_1_size = y.count(1)
    print(f"Original class distribution - Class 0: {class_0_size}, Class 1: {class_1_size}")

    # Convert list of DataFrames to a 3D numpy array
    X = np.stack([sample.values for sample in X])
    y = np.array(y)

    # Optionally compute sample weights for balancing classes
    if balance_classes:
        class_counts = np.bincount(y)
        class_weights = 1. / class_counts
        sample_weights = class_weights[y]
        print(f"Class weights for balancing: Class 0: {class_weights[0]:.4f}, Class 1: {class_weights[1]:.4f}")
        return X, y, sample_weights

    return X, y

def create_data_loaders(X : np.ndarray, y: np.ndarray, sample_weights=None, batch_size=32, test_split=0.2):
    """
    Create balanced data loaders using sample weights
    """
    # Split data into train and test sets
    X_train, X_test, y_train, y_test, idx_train, idx_test = train_test_split(
        X, y, np.arange(len(X)), test_size=test_split, random_state=42, stratify=y
    )

    # Create datasets
    train_dataset = CognitiveLoadDataset(X_train, y_train)
    test_dataset = CognitiveLoadDataset(X_test, y_test)

    # Create data loaders
    if sample_weights is not None:
        # Get sample weights for training samples
        train_sample_weights = sample_weights[idx_train]

        # Create a sampler
        train_sampler = WeightedRandomSampler(
            weights=train_sample_weights,
            num_samples=len(train_sample_weights),
            replacement=True
        )

        train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            sampler=train_sampler
        )
    else:
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, test_loader

def plot_ecg_eda_window(X, y, index=0, sampling_rate=256):
    """
    Plot ECG and EDA signals from a specific window
    """
    sample = X[index]
    # window_size = sample.shape[0]
    window_size = sample.shape[0] // 2


    # Get ECG and EDA segments
    ecg = sample[:window_size, 0]
    eda = sample[:window_size, 1]


    time = np.linspace(0, window_size / sampling_rate, window_size)

    plt.figure(figsize=(12, 5))

    plt.subplot(2, 1, 1)
    plt.plot(time, ecg, label='ECG', color='red')
    plt.title(f'ECG Signal (label: {y[index]})')
    plt.ylabel('Amplitude')
    plt.grid(True)
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.plot(time, eda, label='EDA', color='blue')
    plt.title(f'EDA Signal (label: {y[index]})')
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.show()

def plot_signal_windows(X, labels=None, class_to_plot=None, num_samples=5, title="Signal Windows"):
    """
    Plot multiple signal windows for visualization

    Args:
        X: Input data array
        labels: Optional labels array
        class_to_plot: If provided, only plot windows from this class
        num_samples: Number of windows to plot
        title: Plot title
    """
    plt.figure(figsize=(15, 10))

    # Select indices to plot
    if labels is not None and class_to_plot is not None:
        # Get indices of samples from the specified class
        class_indices = np.where(labels == class_to_plot)[0]
        # Randomly select from those indices
        if len(class_indices) > num_samples:
            indices_to_plot = np.random.choice(class_indices, num_samples, replace=False)
        else:
            indices_to_plot = class_indices
    else:
        # Randomly select from all samples
        indices_to_plot = np.random.choice(len(X), min(num_samples, len(X)), replace=False)

    for i, idx in enumerate(indices_to_plot):
        sample = X[idx]
        # window_size = sample.shape[0]
        window_size = sample.shape[0] // 2


        # Extract ECG and EDA
        ecg = sample[:window_size, 0]
        eda = sample[:window_size, 1]

        # Plot ECG
        plt.subplot(num_samples, 2, 2*i+1)
        plt.plot(ecg, 'r-')
        label_text = f"Window {idx}"
        if labels is not None:
            label_text += f" (Class {labels[idx]})"
        plt.title(f"ECG {label_text}")

        # Plot EDA
        plt.subplot(num_samples, 2, 2*i+2)
        plt.plot(eda, 'b-')
        plt.title(f"EDA {label_text}")

    plt.suptitle(title)
    plt.tight_layout()
    plt.subplots_adjust(top=0.92)
    plt.show()

## Define Models

In [18]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size=1, bidirectional=True):
        super(GRUModel, self).__init__()
        
        direction_factor = 2 if bidirectional else 1
        self.gru_dim = hidden_size * direction_factor
        
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=2,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=0.2 # Přidán dropout mezi vrstvy GRU pro regulaci
        )

        self.layer_norm = nn.LayerNorm(self.gru_dim)
        
        # --- Zde přidáváme Attention vrstvu ---
        self.attention = Attention(self.gru_dim)
        
        self.fc1 = nn.Linear(self.gru_dim, 64)
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(64, output_size)

    def forward(self, x):
        gru_out, _ = self.gru(x)
        gru_out = self.layer_norm(gru_out)
        
        context_vector, weights = self.attention(gru_out) 
        
        x = F.relu(self.fc1(context_vector))
        x = self.dropout(x)
        out = self.fc2(x)
        
        return out, weights

class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim, 1)

    def forward(self, gru_output):
        # gru_output tvar: (batch, seq_len, hidden_dim)
        
        # Spočítáme skóre pro každý časový krok
        attn_weights = self.attn(gru_output) # (batch, seq_len, 1)
        attn_weights = F.softmax(attn_weights, dim=1)
        
        # Vynásobíme výstupy GRU jejich vahami
        context_vector = torch.sum(attn_weights * gru_output, dim=1)
        
        return context_vector, attn_weights

class CNNStressClassifier(nn.Module):
    def __init__(self, input_channels=2, num_classes=2):
        super(CNNStressClassifier, self).__init__()

        self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=7, padding=3)
        self.bn1 = nn.BatchNorm1d(32)

        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(64)

        self.conv3 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(128)

        self.global_pool = nn.AdaptiveAvgPool1d(1)  # Výstup: (batch_size, 128, 1)

        self.fc1 = nn.Linear(128, 64)
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        gru_out, _ = self.gru(x)
        gru_out = self.layer_norm(gru_out)
        
        context_vector, weights = self.attention(gru_out) # weights jsou (batch, seq_len, 1)
        
        x = F.relu(self.fc1(context_vector))
        x = self.dropout(x)
        out = self.fc2(x)
        
        return out, weights # VRACÍME I VÁHY



## Training and Evaluation Functions

In [None]:
def train_model(model, train_loader, criterion, optimizer, epochs=10, device='cpu'):
    """
    Train the model

    Args:
        model: PyTorch model
        train_loader: Training data loader
        criterion: Loss function
        optimizer: Optimizer
        epochs: Number of training epochs
        device: Device to use for training

    Returns:
        Trained model
    """
    model = model.to(device)
    model.train()

    # Calculate class balance in training set
    all_labels = []
    for _, y_batch in train_loader:
        all_labels.extend(y_batch.cpu().numpy())

    unique_labels, label_counts = np.unique(np.array(all_labels), return_counts=True)
    print(f"Training data class distribution: {dict(zip(unique_labels, label_counts))}")

    for epoch in range(epochs):
        print(f"\nEpoch {epoch + 1}/{epochs}")
        epoch_iter = tqdm(train_loader, desc=f"Epoch {epoch + 1}", leave=False)

        total_loss = 0
        correct, total = 0, 0

        for X_batch, y_batch in epoch_iter:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_batch = y_batch.view(-1).float()

            optimizer.zero_grad()
            outputs = model(X_batch).squeeze()  # Ensures shape [batch_size]

            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            predicted = (torch.sigmoid(outputs) > 0.6).float()
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)

        # Calculate and log training metrics
        epoch_loss = total_loss / len(train_loader)
        epoch_acc = correct / total * 100

        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")

        # Check prediction distribution periodically
        if epoch % 5 == 0 or epoch == epochs - 1:
            check_prediction_distribution(model, train_loader, device)

    return model

def check_prediction_distribution(model, data_loader, device='cpu'):
    """
    Check the distribution of predictions
    """
    model.eval()
    all_preds = []

    with torch.no_grad():
        for X_batch, _ in data_loader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch).squeeze()
            preds = (torch.sigmoid(outputs) > 0.6).float().cpu().numpy()
            all_preds.extend(preds)

    unique_preds, pred_counts = np.unique(np.array(all_preds), return_counts=True)
    print(f"Prediction distribution: {dict(zip(unique_preds, pred_counts))}")
    model.train()

def evaluate_model(model, test_loader, threshold=0.6, device='cpu'):
    """
    Evaluate the model

    Args:
        model: PyTorch model
        test_loader: Test data loader
        threshold: Classification threshold
        device: Device to use for evaluation

    Returns:
        true_labels: Ground truth labels
        predicted_labels: Predicted labels
    """
    model = model.to(device)
    model.eval()

    all_outputs = []
    all_labels = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch).squeeze()

            all_outputs.extend(outputs.cpu().numpy())
            all_labels.extend(y_batch.cpu().numpy())

    # Convert to numpy arrays
    all_outputs = np.array(all_outputs)
    all_labels = np.array(all_labels)

    # Apply threshold to get binary predictions
    predicted_labels = (sigmoid(all_outputs) > threshold).astype(int)

    # Calculate metrics
    accuracy = (predicted_labels == all_labels).mean()
    f1 = f1_score(all_labels, predicted_labels)
    balanced_acc = balanced_accuracy_score(all_labels, predicted_labels)

    print("\nEvaluation metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Balanced Accuracy: {balanced_acc:.4f}")

    return all_labels, predicted_labels

def sigmoid(x):
    """
    Sigmoid function for numpy arrays
    """
    return 1 / (1 + np.exp(-x))

def plot_confusion_matrix(y_true, y_pred):
    """
    Plot confusion matrix
    """
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Low Load", "High Load"])
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.show()

    # Print metrics
    tn, fp, fn, tp = cm.ravel()
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

    print(f"Precision: {precision:.4f}")
    print(f"Recall/Sensitivity: {recall:.4f}")
    print(f"Specificity: {specificity:.4f}")

def save_model(model, path):
    """
    Save model to disk
    """
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")

def load_model(path, model):
    """
    Load model from disk
    """
    model.load_state_dict(torch.load(path, map_location=torch.device('cpu')))
    return model


## Google Drive Integration

In [None]:
# Uncomment to mount Google Drive
# from google.colab import drive
# drive.mount('/content/drive')

## Example Usage

In [None]:
# Helper functions for loading pre-processed data from Drive
def download_data():
    """
    Download example data or use existing data
    """
    # Check if data exists
    if not os.path.exists("datasets"):
        os.makedirs("datasets", exist_ok=True)
        os.makedirs("./datasets/Low_load", exist_ok=True)
        os.makedirs("./datasets/High_load", exist_ok=True)

        print("Please upload your ECG/EDA data files to the datasets directory")
        print("The structure should be:")
        print("- datasets/Low_load/")
        print("- datasets/High_load/")
        print("With CSV files in the format: subject_block_signal.csv")
        print("Example: 1_4_ecg.csv, 1_4_eda.csv")
        return False

    return True

# Main execution
if __name__ == "__main__":
    input_size = 2  # ECG and EDA channels
    hidden_size = 128
    output_size = 1
    learning_rate = 1e-3
    batch_size = 64
    # Check for data
    os.getcwd()
    if download_data():
        # Load dataset
        
        base_dir = "./datasets"
        print("Loading data from folders...")
        result = load_data_from_folders(base_dir, balance_classes=True)
        if len(result) == 3:
            X, y, sample_weights = result
        else:
            X, y = result
            sample_weights = None

        # Plot some examples
        # plot_ecg_eda_window(X, y, index=0, sampling_rate=256)
        plot_signal_windows(X, labels=y, class_to_plot=0, num_samples=2, title="Low Cognitive Load Samples")
        plot_signal_windows(X, labels=y, class_to_plot=1, num_samples=2, title="High Cognitive Load Samples")


        # Define model hyperparameters
        # input_size = 1  # ECG channel only
        

        # Create balanced data loaders
        train_loader, test_loader = create_data_loaders(X, y, sample_weights, batch_size)

        # Initialize model
        print("Initializing model...")
        model = GRUModel(input_size, hidden_size, output_size, bidirectional=True).to(device)
        # model = CNNStressClassifier(input_size, output_size).to(device)
        criterion = nn.BCEWithLogitsLoss()
        optimizer = optim.Adam(model.parameters(), learning_rate)

        # Define model path
        model_path = "models/cognitive_load_model.pth"

In [None]:
# Train the model
# model = train_model(model, train_loader, criterion, optimizer, epochs=50, device=device)
model = load_model(model_path, model)

# Save the model
save_model(model, model_path)

# Evaluate the model
true_labels, predicted_labels = evaluate_model(model, test_loader, device=device)

# Plot confusion matrix
plot_confusion_matrix(true_labels, predicted_labels)

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import torch
from IPython.display import display, HTML

def visualize_from_loader(model, test_loader, predicted_labels, start_idx=0, num_windows=8, device='cpu'):
    """
    Vykreslí spojitý signál přímo z dat v test_loaderu a porovná je s předpřipravenými predikcemi.
    """
    model.eval()
    all_x = []
    all_y = []

    # 1. Extrakce dat z loaderu (převedeme batche na souvislá pole)
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            all_x.append(batch_x.cpu().numpy())
            all_y.append(batch_y.cpu().numpy())

    X_test = np.concatenate(all_x, axis=0) # Tvar (N, samples, channels)
    y_test = np.concatenate(all_y, axis=0) # Tvar (N,)

    # 2. Výřez oken pro zobrazení
    end_idx = min(start_idx + num_windows, len(X_test))
    X_slice = X_test[start_idx:end_idx]
    y_t_slice = y_test[start_idx:end_idx]
    y_p_slice = predicted_labels[start_idx:end_idx]
    
    samples_per_win = X_test.shape[1]
    combined_data = np.concatenate(X_slice, axis=0)
    time_axis = np.arange(len(combined_data))

    # 3. Plotly Figura (2 řady)
    fig = make_subplots(
        rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05,
        subplot_titles=("EKG Signál", "EDA Signál")
    )

    fig.add_trace(go.Scatter(x=time_axis, y=combined_data[:, 0], name="EKG", line_color='red'), row=1, col=1)
    fig.add_trace(go.Scatter(x=time_axis, y=combined_data[:, 1], name="EDA", line_color='blue'), row=2, col=1)

    # 4. Barevné zóny a popisky
    for i in range(len(X_slice)):
        is_correct = (y_t_slice[i] == y_p_slice[i])
        bg_color = "rgba(40, 167, 69, 0.2)" if is_correct else "rgba(220, 53, 69, 0.2)"
        
        x0, x1 = i * samples_per_win, (i + 1) * samples_per_win
        fig.add_vrect(x0=x0, x1=x1, fillcolor=bg_color, line_width=1, layer="below")

        status = "✅ OK" if is_correct else "❌ ERROR"
        pred_text = "HIGH" if y_p_slice[i] == 1 else "LOW"
        gt_text = "HIGH" if y_t_slice[i] == 1 else "LOW"
        
        fig.add_annotation(
            x=(x0 + x1) / 2, y=1.1, yref="paper",
            text=f"<b>{status}</b><br>P: {pred_text} | GT: {gt_text}",
            showarrow=False, font=dict(size=10)
        )

    fig.update_layout(height=600, margin=dict(l=10, r=10, t=80, b=40), template="plotly_white")

    # Zobrazení
    html_str = fig.to_html(include_plotlyjs='cdn', full_html=False)
    display(HTML(f'<div style="width:100%;">{html_str}</div>'))

In [None]:
model_path = "./models/cognitive_load_model.pth" # "drive/MyDrive/DP/cognitive_load_model.pth"

model = load_model(model_path, model)

visualize_from_loader(model, test_loader, predicted_labels, start_idx=0, num_windows=8)