**<h1><center>CSAI 463 Computational Intelligence</center></h1>**
**<h1><center>Final Project - P300 Speller BCI System</center></h1>**

## **Team Members :**
## Zeina Ayman, ID: 202200351
## Mai Waheed, ID: 202200556

In [1]:
# Import Libraries
import os
import numpy as np
import tkinter as tk
from scipy.signal import butter, lfilter
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

In [2]:
dataset_dir = "/Users/zeina/Downloads/CI_Project/"

# Load Training Data
train_signal = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Train_Signal.txt"))  # EEG Signals for Training
train_flashing = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Train_Flashing.txt"))  # Flashing Events (When Row/Column Flashed)
train_stimulus_code = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Train_StimulusCode.txt"))  # Which Row/Column Flashed
train_stimulus_type = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Train_StimulusType.txt"))  # Labels: 1 for Target, 0 for Non-Target
train_target_char = open(os.path.join(dataset_dir, "Subject_A_Train_TargetChar.txt")).read().strip()  # Actual Characters for Training

# Load Test Data
test_signal = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Test_Signal.txt"))  # EEG Signals for Testing
test_flashing = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Test_Flashing.txt"))  # Flashing Events for Test Data
test_stimulus_code = np.loadtxt(os.path.join(dataset_dir, "Subject_A_Test_StimulusCode.txt"))  # Which Row/Column Flashed in Test Data

In [3]:
# Bandpass Filter Parameters
lowcut = 1.0  # Lower Cutoff Frequency of the Bandpass Filter (Hz)
highcut = 15.0  # Upper Cutoff Frequency of the Bandpass Filter (Hz)
fs = 240  # Sampling Frequency of the EEG Signals (Hz)
order = 4  # Filter Order for the Butterworth Filter

# Function to Design a Butterworth Bandpass Filter
def butter_bandpass(lowcut, highcut, fs, order = 4):
    nyq = 0.5 * fs  # Nyquist Frequency (Half the Sampling Rate)
    low = lowcut / nyq  # Normalize Lower Cutoff Frequency
    high = highcut / nyq  # Normalize Upper Cutoff Frequency
    b, a = butter(order, [low, high], btype = 'band')  # Design Bandpass Filter
    return b, a

# Function to Apply the Bandpass Filter to EEG Data
def bandpass_filter(data, lowcut, highcut, fs, order = 4):
    b, a = butter_bandpass(lowcut, highcut, fs, order = order)  # Get Filter Coefficients
    filtered_data = lfilter(b, a, data, axis = 1)  # Apply Filter Along Time Axis 
    return filtered_data

# Apply the Bandpass Filter to Training and Test EEG Signals
train_signal_filtered = bandpass_filter(train_signal, lowcut, highcut, fs, order)  # Filtered Training Signal
test_signal_filtered = bandpass_filter(test_signal, lowcut, highcut, fs, order)  # Filtered Test Signal

print("Filtered EEG Signals (Train and Test) Ready.")


Filtered EEG Signals (Train and Test) Ready.


In [4]:
# Function to Extract EEG Epochs Around Flashing Events
def extract_epochs_debug(signal, flashing, stimulus_code, window = (-0.2, 0.8), fs = 240):
    """
    Extract epochs from EEG signal with reduced memory load for debugging.
    Args:
    - signal: EEG signal matrix (channels x samples)
    - flashing: Matrix indicating when each flash occurred
    - stimulus_code: Matrix indicating which row/column was flashed
    - window: Time window for epoch extraction in seconds (start, end)
    - fs: Sampling frequency of the EEG signal
    Returns:
    - epochs: Array of extracted EEG epochs (n_epochs x channels x timepoints)
    - labels: Array of corresponding stimulus codes for each epoch
    """
    n_channels, n_samples = signal.shape  # Get Signal Dimensions
    epoch_start = int(window[0] * fs)  # Convert Start Time to Samples
    epoch_end = int(window[1] * fs)  # Convert End Time to Samples
    epoch_length = epoch_end - epoch_start  # Epoch Length in Samples

    epochs = []
    labels = []

    # Limit to First 3000 Samples for Debugging 
    sample_limit = min(n_samples, 3000)
    flashing = flashing[:sample_limit]
    stimulus_code = stimulus_code[:sample_limit]
    signal = signal[:, :sample_limit]

    # Find Samples where Flashing Occurred
    flash_samples, flash_indices = np.where(flashing == 1)
    for sample, flash in zip(flash_samples, flash_indices):
        start = sample + epoch_start
        end = sample + epoch_end
        if start >= 0 and end < sample_limit:
            epoch = signal[:, start:end]  # Extract EEG Segment
            label = stimulus_code[sample, flash]  # Corresponding Stimulus Code
            epochs.append(epoch)
            labels.append(label)
    return np.array(epochs), np.array(labels)

# Extract Training Epochs 
train_epochs_debug, train_labels_debug = extract_epochs_debug(train_signal_filtered[:64, :], train_flashing, train_stimulus_code)
print(f"Extracted {len(train_epochs_debug)} Training Epochs with Shape {train_epochs_debug[0].shape}")

# Extract Test Epochs
test_epochs_debug, test_labels_debug = extract_epochs_debug(test_signal_filtered[:64, :], test_flashing, test_stimulus_code)
print(f"Extracted {len(test_epochs_debug)} Test Epochs with Shape {test_epochs_debug[0].shape}")


Extracted 134130 Training Epochs with Shape (64, 240)
Extracted 157800 Test Epochs with Shape (64, 240)


In [5]:
# Define a CSP Transformer for Feature Extraction
class CSP(BaseEstimator, TransformerMixin):
    def __init__(self, n_components = 4):
        self.n_components = n_components  # Number of CSP Filters to Extract
        
    def fit(self, X, y):
        covs = [np.cov(x) for x in X] # Compute Covariance Matrices for Each Epoch
        cov_avg = np.mean(covs, axis = 0) # Calculate the Average Covariance Matrix
        eigvals, eigvecs = np.linalg.eigh(cov_avg) # Perform Eigen Decomposition to Get Spatial Filters
        idx = np.argsort(eigvals)[::-1] # Sort Eigenvectors by Eigenvalues in Descending Order
        self.filters_ = eigvecs[:, idx[:self.n_components]] # Select the Top n_components Eigenvectors as CSP Filters
        return self

    def transform(self, X):
        X_transformed = [np.dot(self.filters_.T, x) for x in X] # Apply the CSP Filters to the Data
        return np.array([x.flatten() for x in X_transformed]) # Flatten Each Transformed Epoch into a 1D Feature Vector

# Instantiate CSP with 4 Components and LDA Classifier
csp = CSP(n_components = 4)
lda = LDA()

# Use a Subset of the Training Epochs for CSP Training 
subset_size = 1000
train_epochs_subset = train_epochs_debug[:subset_size]
train_labels_subset = train_labels_debug[:subset_size]
train_labels_binary_subset = (train_labels_subset == 1).astype(int)  # Convert Labels to Binary (1 for Target, 0 for Non-Target)

# Fit the CSP Transformer on the Subset and Extract Features
csp.fit(train_epochs_subset, train_labels_binary_subset)
X_train_csp = csp.transform(train_epochs_subset)

# Train the LDA Classifier Using the Extracted CSP Features
lda.fit(X_train_csp, train_labels_binary_subset)

print(f"CSP and LDA Trained on {subset_size} Epochs.") 

CSP and LDA Trained on 1000 Epochs.


In [6]:
# Batch Processing of Test Data 
batch_size = 10000 
n_test_epochs = len(test_epochs_debug)  # Total Number of Test Epochs
test_predictions = []  # List to Store the Predicted Probabilities for All Test Epochs

# Process Test Data in Batches
for start_idx in range(0, n_test_epochs, batch_size):
    end_idx = min(start_idx + batch_size, n_test_epochs)  # Define Batch End Index
    batch_epochs = test_epochs_debug[start_idx:end_idx]  # Slice Batch from Test Epochs

    # Extract CSP Features from the Current Batch of Test Epochs
    X_test_csp = csp.transform(batch_epochs)

    # Predict Probabilities Using the Trained LDA Model
    # [:, 1] Extracts the Probability of the Positive Class (P300 = target)
    probas = lda.predict_proba(X_test_csp)[:, 1]
    test_predictions.extend(probas)  # Store Probabilities for this Batch

    print(f"Processed {end_idx}/{n_test_epochs} Test Epochs.")

# Convert the Collected Predictions to a NumPy Array
test_predictions = np.array(test_predictions)
print("Test Predictions Completed.")  

Processed 10000/157800 Test Epochs.
Processed 20000/157800 Test Epochs.
Processed 30000/157800 Test Epochs.
Processed 40000/157800 Test Epochs.
Processed 50000/157800 Test Epochs.
Processed 60000/157800 Test Epochs.
Processed 70000/157800 Test Epochs.
Processed 80000/157800 Test Epochs.
Processed 90000/157800 Test Epochs.
Processed 100000/157800 Test Epochs.
Processed 110000/157800 Test Epochs.
Processed 120000/157800 Test Epochs.
Processed 130000/157800 Test Epochs.
Processed 140000/157800 Test Epochs.
Processed 150000/157800 Test Epochs.
Processed 157800/157800 Test Epochs.
Test Predictions Completed.


In [8]:
# Define the 6x6 Character Matrix 
speller_matrix = [
    ['A', 'B', 'C', 'D', 'E', 'F'],
    ['G', 'H', 'I', 'J', 'K', 'L'],
    ['M', 'N', 'O', 'P', 'Q', 'R'],
    ['S', 'T', 'U', 'V', 'W', 'X'],
    ['Y', 'Z', '1', '2', '3', '4'],
    ['5', '6', '7', '8', '9', '0']
]

# Function to Decode Predicted P300 Responses into Characters
def decode_letters(predictions, flashes_per_letter = 12):
    n_letters = len(predictions) // flashes_per_letter  # Calculate Number of Letters in the Message
    decoded_sentence = [] 

    for i in range(n_letters):
        # Extract Predictions for One Letter (12 flashes: 6 rows + 6 columns)
        letter_preds = predictions[i * flashes_per_letter:(i + 1) * flashes_per_letter]
        row_preds = letter_preds[:6]  # Predictions for Rows
        col_preds = letter_preds[6:]  # Predictions forcolumns

        # Identify the Row and Column with Highest Predicted Probability
        row_idx = np.argmax(row_preds)
        col_idx = np.argmax(col_preds)

        # Decode the Corresponding Character from the Speller Matrix
        decoded_char = speller_matrix[row_idx][col_idx]
        decoded_sentence.append(decoded_char)
    
    return ''.join(decoded_sentence)  # Join Characters to Form the Sentence

# Decode the Full Sentence Using the Test Predictions
decoded_sentence = decode_letters(test_predictions)
print("Decoded Sentence:", decoded_sentence)  # Print the Decoded Message

Decoded Sentence: AAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAACAAAAAAAAAAAAAAAAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

In [9]:
output_path = "/Users/zeina/Downloads/CI_Project/output_real.txt"

# Save Decoded Sentence
with open(output_path, 'w') as f:
    f.write(decoded_sentence)

print(f"Decoded Sentence Saved to {output_path}")

Decoded Sentence Saved to /Users/zeina/Downloads/CI_Project/output_real.txt


In [None]:
# GUI Setup
root = tk.Tk()
root.title("Final Project - P300 Speller BCI System")

# Grid Characters Layout
speller_matrix = [
    ['A', 'B', 'C', 'D', 'E', 'F'],
    ['G', 'H', 'I', 'J', 'K', 'L'],
    ['M', 'N', 'O', 'P', 'Q', 'R'],
    ['S', 'T', 'U', 'V', 'W', 'X'],
    ['Y', 'Z', '1', '2', '3', '4'],
    ['5', '6', '7', '8', '9', '0']
]

# Create grid of labels (6x6)
labels = []
for i in range(6):
    row = []
    for j in range(6):
        label = tk.Label(root, text = speller_matrix[i][j], width = 6, height = 3, borderwidth = 2, relief = "solid", font = ('Arial', 24))
        label.grid(row = i, column = j)
        row.append(label)
    labels.append(row)

# Display Decoded Sentence
sentence_label = tk.Label(root, text = "Decoded Sentence: ", font = ('Arial', 18))
sentence_label.grid(row = 6, column = 0, columnspan = 6)

# Initialize Tracking Variables
decoded_sentence_gui = []
current_letter_index = 0
flashes_per_letter = 12  # Each Letter Corresponds to Exactly 12 Flashes (6 rows + 6 columns)

def flash_simulation():
    global current_letter_index
    if current_letter_index * flashes_per_letter >= len(test_predictions):
        sentence_label.config(text = "Decoded Sentence: " + ''.join(decoded_sentence_gui) + " (End of Message)")
        return
    
    # Initialize Arrays to Accumulate Scores
    row_sums = np.zeros(6)
    col_sums = np.zeros(6)
    row_counts = np.zeros(6)
    col_counts = np.zeros(6)
    
    # Flash 6 Rows and 6 Columns 
    for flash_num in range(12):
        flash_idx = current_letter_index * flashes_per_letter + flash_num
        prob = test_predictions[flash_idx]
        if flash_num < 6:
            # Flash Row
            row_idx = flash_num  # 0 - 5
            for j in range(6):
                labels[row_idx][j].config(bg = 'yellow')
            root.update()
            root.after(200)
            for j in range(6):
                labels[row_idx][j].config(bg = 'SystemButtonFace')
            row_sums[row_idx] += prob
            row_counts[row_idx] += 1
        else:
            # Flash column
            col_idx = flash_num - 6  # 0 - 5
            for i in range(6):
                labels[i][col_idx].config(bg = 'yellow')
            root.update()
            root.after(200)
            for i in range(6):
                labels[i][col_idx].config(bg = 'SystemButtonFace')
            col_sums[col_idx] += prob
            col_counts[col_idx] += 1
        root.after(100)  # Short Delay Between Flashes
    
    # Calculate Average Probabilities for Rows and Columns
    row_avgs = row_sums / np.maximum(row_counts, 1)
    col_avgs = col_sums / np.maximum(col_counts, 1)
    
    # Select Row and Column with Highest Average Probability
    selected_row = int(np.argmax(row_avgs))
    selected_col = int(np.argmax(col_avgs))
    decoded_char = speller_matrix[selected_row][selected_col]
    decoded_sentence_gui.append(decoded_char)
    
    # Update Sentence Display
    sentence_label.config(text = f"Decoded Sentence: {''.join(decoded_sentence_gui)}")
    
    current_letter_index += 1

# Bind Spacebar to Trigger Flashing Simulation
root.bind('<space>', lambda event: flash_simulation())

root.mainloop()