# P300 Speller Dataset Processing

This notebook processes the P300 Speller dataset collected from ALS patients. The dataset contains EEG recordings of P300 evoked potentials using the BCI2000 system with a 6x6 matrix of characters.

## Dataset Overview
- 8 ALS patients focused on characters in a 6x6 matrix
- EEG recorded from 8 channels (Fz, Cz, Pz, Oz, P3, P4, PO7, PO8)
- Sampling rate: 256 Hz
- Data labeled as target (P300 present) and non-target (P300 absent) stimuli
- Each character selection involves multiple row/column intensifications

## Import Required Libraries

In [None]:
%pip install -q -r requirements.txt

In [None]:
# Import libraries for data processing
import scipy.io as sio
import numpy as np
import os
import matplotlib.pyplot as plt
import pandas as pd

## Constants Based on the P300 Dataset Documentation

These constants are defined based on the P300 dataset documentation.

In [None]:
# Dataset constants
SAMPLE_RATE = 256  # Hz (sampling rate)
SAMPLE_DURATION = 64  # Number of samples in each stimulus window
INTENSIFIED_N_TIMES = 20  # Each item was intensified 20 times (10 row + 10 column intensifications)
MATRIX_DIMENSIONS = 6  # 6x6 matrix of characters
N_CHARACTERS = 36  # Total number of characters in the matrix
N_CHANNELS = 8  # Number of EEG channels

## Data Loading Functions

In [None]:
def load_matlab_file(file_path):
    """Load a MATLAB .mat file and return its contents.
    
    Args:
        file_path (str): Path to the MATLAB file
        
    Returns:
        dict: Contents of the MATLAB file
    """
    return sio.loadmat(file_path)

In [None]:
def save_data(filename, data):
    """Save numpy array to a file.
    
    Args:
        filename (str): Path to save the file
        data (numpy.ndarray): Data to save
    """
    np.save(filename, data)

## Data Transformation Functions

In [None]:
def transform_data(data, subject_id, samples_per_target, sample_offset=0):
    """Transform raw MATLAB data into structured arrays for target and non-target stimuli.
    
    Args:
        data (dict): MATLAB data dictionary
        subject_id (str): Subject identifier
        samples_per_target (int): Number of samples to include per target
        sample_offset (int, optional): Offset for sample selection. Defaults to 0.
        
    Returns:
        tuple: Two arrays containing class 1 (non-target) and class 2 (target) data
    """
    # Extract relevant data from the MATLAB structure
    eeg_data = data['X'][0]  # EEG data [samples × channels]
    stimulus_type = data['y'][0]  # Stimulus type (1=non-target, 2=target)
    trial_start_indices = data['trial'][0][0]  # Trial start indices
    
    # Calculate total samples per trial
    samples_per_trial = SAMPLE_DURATION * INTENSIFIED_N_TIMES * MATRIX_DIMENSIONS
    
    # Initialize arrays for non-target (class 1) and target (class 2) data
    final_data_class1 = np.zeros((samples_per_target, samples_per_trial, N_CHANNELS))
    final_data_class2 = np.zeros((samples_per_target, samples_per_trial, N_CHANNELS))
    
    # Counters for the number of samples in each class
    class_1_count = 0
    class_2_count = 0
    
    # Process each trial
    for i, start_idx in enumerate(trial_start_indices):
        # Define the trial window with padding
        end_idx = start_idx + samples_per_trial + SAMPLE_RATE  # Add 1 second padding
        trial_data = eeg_data[(start_idx + sample_offset):(end_idx + sample_offset)]
        trial_stimulus_type = stimulus_type[(start_idx + sample_offset):(end_idx + sample_offset)]
        
        # Process each stimulus intensification in the trial
        for j in range(INTENSIFIED_N_TIMES * MATRIX_DIMENSIONS):
            # Get the stimulus type for this segment
            current_stimulus_type = trial_stimulus_type[j * SAMPLE_DURATION : (j + 1) * SAMPLE_DURATION - 1]
            
            # Get the EEG data for this segment (including 1 second after stimulus)
            # This captures the P300 response which typically occurs 300ms post-stimulus
            character_data = trial_data[(j * SAMPLE_DURATION) : ((j * SAMPLE_DURATION) + SAMPLE_RATE), :]
            
            # Classify based on stimulus type
            if 1 in current_stimulus_type:  # Non-target stimulus
                if class_1_count < final_data_class1.shape[1]:
                    final_data_class1[:, class_1_count] = character_data
                    class_1_count += 1
            elif 2 in current_stimulus_type:  # Target stimulus (P300 present)
                if class_2_count < final_data_class2.shape[1]:
                    final_data_class2[:, class_2_count] = character_data
                    class_2_count += 1
            else:
                # Skip segments with no stimulus type information
                continue
    
    # Trim arrays to actual sample counts
    final_data_class1 = final_data_class1[:, :class_1_count]
    final_data_class2 = final_data_class2[:, :class_2_count]
    
    print(f"Class 1 (Non-target): {class_1_count} samples, Class 2 (Target): {class_2_count} samples")
    return final_data_class1, final_data_class2

## Data Balancing Function

In [None]:
def balance_classes(class1_data, class2_data):
    """Balance the two classes by downsampling the majority class.
    
    Args:
        class1_data (numpy.ndarray): Data for class 1 (non-target)
        class2_data (numpy.ndarray): Data for class 2 (target)
        
    Returns:
        tuple: Balanced data for both classes
    """
    # Get the shapes of both classes
    class1_shape = class1_data.shape
    class2_shape = class2_data.shape
    
    # Determine which class has fewer samples
    if class1_shape[1] > class2_shape[1]:  # If class 1 has more samples
        # Randomly downsample class 1 to match class 2 size
        indices = np.random.choice(class1_shape[1], class2_shape[1], replace=False)
        balanced_class1 = class1_data[:, indices, :]
        balanced_class2 = class2_data
        
        print(f"Downsampled class 1 from {class1_shape[1]} to {class2_shape[1]} samples")
    elif class2_shape[1] > class1_shape[1]:  # If class 2 has more samples
        # Randomly downsample class 2 to match class 1 size
        indices = np.random.choice(class2_shape[1], class1_shape[1], replace=False)
        balanced_class1 = class1_data
        balanced_class2 = class2_data[:, indices, :]
        
        print(f"Downsampled class 2 from {class2_shape[1]} to {class1_shape[1]} samples")
    else:  # Classes already balanced
        balanced_class1 = class1_data
        balanced_class2 = class2_data
        print("Classes already balanced with", class1_shape[1], "samples each")
    
    return balanced_class1, balanced_class2

## Process and Save Data

Load each subject's data, transform it, balance the classes, and save the results.

In [None]:
# List of subject files
subjects = ["A01.mat", "A02.mat", "A03.mat", "A04.mat", "A05.mat", "A06.mat", "A07.mat", "A08.mat"]

# Create directories for saving processed data
os.makedirs('./data/partitioned/class_1', exist_ok=True)
os.makedirs('./data/partitioned/class_2', exist_ok=True)
os.makedirs('./data/balanced/class_1', exist_ok=True)
os.makedirs('./data/balanced/class_2', exist_ok=True)

# Process each subject's data
for file in subjects:
    print(f"\nProcessing subject: {file}")
    # Load the MATLAB data
    matlab_data = load_matlab_file(f"data/raw/{file}")['data'][0]
    
    # Transform the data
    print("Transforming data...")
    class1_data, class2_data = transform_data(matlab_data, file.replace('.mat', ''), 256)
    
    # Verify class imbalance (there should be more non-target than target samples)
    print(f"Original data shapes - Class 1: {class1_data.shape}, Class 2: {class2_data.shape}")
    assert class1_data.shape[1] > class2_data.shape[1], "Expected more non-target than target samples"
    
    # Save the original (imbalanced) data
    subject_id = file.replace(".mat", "")
    save_data(f'./data/partitioned/class_1/{subject_id}', class1_data)
    save_data(f'./data/partitioned/class_2/{subject_id}', class2_data)
    print(f"Saved original data for {subject_id}")
    
    # Balance the classes
    print("Balancing classes...")
    balanced_class1, balanced_class2 = balance_classes(class1_data, class2_data)
    
    # Verify balance
    print(f"Balanced data shapes - Class 1: {balanced_class1.shape}, Class 2: {balanced_class2.shape}")
    assert balanced_class1.shape[1] == balanced_class2.shape[1], "Classes should have equal number of samples"
    
    # Save the balanced data
    save_data(f'./data/balanced/class_1/{subject_id}', balanced_class1)
    save_data(f'./data/balanced/class_2/{subject_id}', balanced_class2)
    print(f"Saved balanced data for {subject_id}")

## Visualization and Analysis

In [None]:
def plot_p300_average(subject_id):
    """Plot the average waveform for target and non-target stimuli for a subject.
    
    Args:
        subject_id (str): Subject identifier (e.g., 'A01')
    """
    # Load the balanced data
    class1_data = np.load(f'./data/balanced/class_1/{subject_id}.npy')
    class2_data = np.load(f'./data/balanced/class_2/{subject_id}.npy')
    
    # Average across trials for each class
    avg_class1 = np.mean(class1_data, axis=1)  # Average non-target response
    avg_class2 = np.mean(class2_data, axis=1)  # Average target response (P300)
    
    # Create a time vector (assuming 256 Hz sampling rate)
    time = np.arange(avg_class1.shape[0]) / SAMPLE_RATE * 1000  # Convert to milliseconds
    
    # Plot averages for channel Pz (index 2), which typically shows the clearest P300
    plt.figure(figsize=(12, 6))
    plt.plot(time, avg_class1[:, 2], 'b-', label='Non-Target')
    plt.plot(time, avg_class2[:, 2], 'r-', label='Target (P300)')
    plt.axvline(x=300, color='gray', linestyle='--', label='300ms (P300 expected)')
    plt.xlabel('Time (ms)')
    plt.ylabel('Amplitude (μV)')
    plt.title(f'Average P300 Response for Subject {subject_id} (Channel Pz)')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
# Example: Plot P300 average for first subject
plot_p300_average('A01')