# Pairwise combination: c-VEP + P300
## Overview of the code
The code performs classification on c-VEP together with P300, using rCCA.

1. Data Loading and Preprocessing:
The data for each participant and task is loaded, containing trials, EEG signals (X), labels (y), and the encoding matrix (V). It is then split into training and test sets using k-fold cross-validation.

2. Template Creation (Encoding Matrix Transformation):
The encoding matrix E is constructed, representing the expected response patterns for each stimulus. It is then transformed into a template matrix M. This transformation prepares the encoding matrix to be used for classification.

3. rCCA Classification
The rCCA classifier is trained and uses the encoding matrix V to correlate the data with the stimulus template.

4. Template Matching
A template (T) is generated by transforming the encoding matrix M using the trained rCCA model. For each trial in the test set, the test data is compared with the template T using correlation. The class label with the highest correlation score is selected as the predicted label for that trial.

5. Accuracy Calculation
The predicted labels are compared to the true labels. The accuracy is calculated by taking the mean of correct predictions across the test set.

6. Results
The accuracy for each fold, task, and subject is stored and averaged to evaluate the overall performance of the classifier.




In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Jordy Thielen (jordy.thielen@donders.ru.nl)
"""

import os
import numpy as np
import pyntbci

data_dir = '/Users/juliette/Desktop/thesis/preprocessing/hybrid_preprocessing'
save_dir = '/Users/juliette/Desktop/thesis/results/c-VEP+P300'

subjects = ["VPpdia", "VPpdib", "VPpdic", "VPpdid", "VPpdie", "VPpdif", "VPpdig", "VPpdih", "VPpdii", "VPpdij",
           "VPpdik", "VPpdil", "VPpdim", "VPpdin", "VPpdio", "VPpdip", "VPpdiq", "VPpdir", "VPpdis", "VPpdit",
           "VPpdiu", "VPpdiv", "VPpdiw", "VPpdix", "VPpdiy", "VPpdiz", "VPpdiza", "VPpdizb", "VPpdizc"]

tasks = ["overt", "covert"]

event = "refe"
onset_event = True
encoding_length = [0.3, 0.3, 0.3, 0.7, 0.7]
ensemble = False
n_folds = 4
method = "hybrid"  # cvep, p300, hybrid

# Loop participants
# Define accuracies
accuracy = np.zeros((len(subjects), len(tasks), n_folds))
accuracy_se = np.zeros((len(subjects), len(tasks)))
accuracy_mean = np.zeros((len(subjects), len(tasks)))

for i_subject, subject in enumerate(subjects):
    print(f"{subject}", end="\t")

    # Loop tasks
    for i_task, task in enumerate(tasks):
        print(f"{task}: ", end="")

        # Load data
        fn = os.path.join(data_dir, f"sub-{subject}_task-{task}_c-VEP+P300_ICA.npz")
        tmp = np.load(fn)
        fs = int(tmp["fs"])
        X = tmp["X"] # EEG data matrix (samples, channels, time points)
        y = tmp["y"] # Labels
        z = tmp["z"] # Target presence (trials x epochs x sides)
        V = tmp["V"] # One code cycle (classes x samples)
        
        # Align V with X
        V = np.tile(V, (1, int(np.ceil(X.shape[2] / V.shape[1]))))[:, :X.shape[2]]

        # Cross-validation
        folds = np.repeat(np.arange(n_folds), int(X.shape[0] / n_folds))
        for i_fold in range(n_folds):
            # Split data to train and test set
            X_trn, y_trn, z_trn = X[folds != i_fold, :, :], y[folds != i_fold], z[folds != i_fold, :, :]
            X_tst, y_tst, z_tst = X[folds == i_fold, :, :], y[folds == i_fold], z[folds == i_fold, :, :]

            # ----------------------------------------------------------------------------------------------------------
            # Train classifier

            # c-VEP event matrix per trial
            E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]
            E_cvep = E_cvep[y_trn, :, :]

            # P300 event matrix per trial
            E_p300 = np.zeros((E_cvep.shape[0], 2, E_cvep.shape[2]))
            for i_trial in range(y_trn.size):
                for i_target in range(2):
                    idx = 0.250 * fs * np.where(z_trn[i_trial, :, y_trn[i_trial]] == i_target)[0]
                    E_p300[i_trial, i_target, idx.astype("int")] = 1

            
            # Combine event matrices of c-VEP and P300
            if method == "hybrid":
                E = np.concatenate((E_cvep, E_p300), axis=1)
            elif method == "cvep":
                E = E_cvep
            elif method == "p300":
                E = E_p300
            else:
                raise Exception("Unknown method:", method)

            # Structure matrix
            if np.isscalar(encoding_length):
                lengths = [encoding_length] * E.shape[1]
            else:
                lengths = encoding_length
                assert len(lengths) == E.shape[1], f"Expected {E.shape[1]} encoding lengths, got {len(lengths)}"
            lengths_samples = [int(fs * l) for l in lengths]
            M = pyntbci.utilities.encoding_matrix(E, lengths_samples)


            # Perform CCA
            S = X_trn.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], X.shape[1])) # Reshape EEG, (trials x time, channels)
            D = M.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], M.shape[1])) # Reshape structure matrix
            cca = pyntbci.transformers.CCA(n_components=1)
            cca.fit(S, D)

            # ----------------------------------------------------------------------------------------------------------
            # Apply classifier

            # Perform classification per trial because the P300 event structure changes per trial
            yh_tst = np.zeros(y_tst.size)
            for i_trial in range(y_tst.size):

                # c-VEP event matrix
                E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]

                # P300 event matrix
                E_p300 = np.zeros((E_cvep.shape[0], 2, E_cvep.shape[2]))
                for i_side in range(2):
                    for i_target in range(2):
                        idx = 0.250 * fs * np.where(z_tst[i_trial, :, i_side] == i_target)[0]
                        E_p300[i_side, i_target, idx.astype("int")] = 1
                        

                # Combine event matrices of c-VEP and P300
                if method == "hybrid":
                    E = np.concatenate((E_cvep, E_p300), axis=1)
                elif method == "cvep":
                    E = E_cvep
                elif method == "p300":
                    E = E_p300
                else:
                    raise Exception("Unknown method:", method)

                # Structure matrix
                if np.isscalar(encoding_length):
                    lengths = [encoding_length] * E.shape[1]
                else:
                    lengths = encoding_length
                    assert len(lengths) == E.shape[1], f"Expected {E.shape[1]} encoding lengths, got {len(lengths)}"
                lengths_samples = [int(fs * l) for l in lengths]
                M = pyntbci.utilities.encoding_matrix(E, lengths_samples)


                # Apply template matching classifier
                T = cca.transform(None, M)[1][:, 0, :] # Prepare template (T)
                x = cca.transform(X_tst[[i_trial], :, :], None)[0][:, 0, :] # Transform test signal
                yh_tst[i_trial] = np.argmax(pyntbci.utilities.correlation(x, T), axis=1)[0]

            # Compute accuracy
            accuracy[i_subject, i_task, i_fold] = np.mean(yh_tst == y_tst)
            
        # Compute mean and standard error
        fold_accuracies = accuracy[i_subject, i_task, :]
        accuracy_mean[i_subject, i_task] = fold_accuracies.mean()
        accuracy_se[i_subject, i_task] = np.round(fold_accuracies.std() / np.sqrt(n_folds), 2)

        print(f"{accuracy_mean[i_subject, i_task]:.3f} ± {accuracy_se[i_subject, i_task]:.3f}", end="\t")
    print()

print(f"Average:\tovert: {accuracy[:, 0, :].mean():.3f}\tcovert: {accuracy[:, 1, :].mean():.3f}")

np.savez(os.path.join(save_dir, "cvep_rcca_p300_ICA.npz"), accuracy=accuracy, accuracy_mean=accuracy_mean, accuracy_se=accuracy_se)

VPpdia	overt: 1.000 ± 0.000	covert: 0.850 ± 0.060	
VPpdib	overt: 1.000 ± 0.000	covert: 0.787 ± 0.060	
VPpdic	overt: 1.000 ± 0.000	covert: 0.963 ± 0.020	
VPpdid	overt: 1.000 ± 0.000	covert: 0.938 ± 0.030	
VPpdie	overt: 1.000 ± 0.000	covert: 0.950 ± 0.020	
VPpdif	overt: 1.000 ± 0.000	covert: 1.000 ± 0.000	
VPpdig	overt: 1.000 ± 0.000	covert: 0.913 ± 0.040	
VPpdih	overt: 1.000 ± 0.000	covert: 0.938 ± 0.030	
VPpdii	overt: 1.000 ± 0.000	covert: 0.938 ± 0.020	
VPpdij	overt: 1.000 ± 0.000	covert: 1.000 ± 0.000	
VPpdik	overt: 1.000 ± 0.000	covert: 0.988 ± 0.010	
VPpdil	overt: 1.000 ± 0.000	covert: 0.950 ± 0.030	
VPpdim	overt: 1.000 ± 0.000	covert: 0.988 ± 0.010	
VPpdin	overt: 0.650 ± 0.110	covert: 0.463 ± 0.020	
VPpdio	overt: 1.000 ± 0.000	covert: 0.975 ± 0.010	
VPpdip	overt: 1.000 ± 0.000	covert: 0.975 ± 0.010	
VPpdiq	overt: 1.000 ± 0.000	covert: 1.000 ± 0.000	
VPpdir	overt: 1.000 ± 0.000	covert: 0.988 ± 0.010	
VPpdis	overt: 0.950 ± 0.040	covert: 0.912 ± 0.020	
VPpdit	overt: 1.000 ± 0.000	cov

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Jordy Thielen (jordy.thielen@donders.ru.nl)
"""

import os

import numpy as np
import pyntbci

data_dir = '/Users/juliette/Desktop/thesis/preprocessing/hybrid_preprocessing'
save_dir = '/Users/juliette/Desktop/thesis/results/c-VEP+P300'

subjects = [
    "VPpdia", "VPpdib", "VPpdic", "VPpdid", "VPpdie", "VPpdif", "VPpdig", "VPpdih", "VPpdii", "VPpdij", "VPpdik",
    "VPpdil", "VPpdim", "VPpdin", "VPpdio", "VPpdip", "VPpdiq", "VPpdir", "VPpdis", "VPpdit", "VPpdiu", "VPpdiv",
    "VPpdiw", "VPpdix", "VPpdiy", "VPpdiz", "VPpdiza", "VPpdizb", "VPpdizc"
]
tasks = ["overt", "covert"]

event = "refe"
onset_event = True
encoding_stride = 1/120
ensemble = False
n_folds = 4
method = "hybrid"  # cvep, p300, hybrid

if method == "cvep":
    encoding_length = np.array([0.3])
elif method == "p300":
    encoding_length = np.array([0.7])
elif method == "hybrid":
    encoding_length = [0.3, 0.3, 0.3, 0.7]

# Loop participants
accuracy = np.zeros((len(subjects), len(tasks), n_folds))
for i_subject, subject in enumerate(subjects):
    print(f"{subject}", end="\t")

    # Loop tasks
    for i_task, task in enumerate(tasks):
        print(f"{task}: ", end="")

        # Load data
        fn = os.path.join(data_dir, f"sub-{subject}_task-{task}_c-VEP+P300_ICA.npz")
        tmp = np.load(fn)
        fs = int(tmp["fs"])
        X = tmp["X"]
        y = tmp["y"]
        z = tmp["z"]
        V = tmp["V"]
        V = np.tile(V, (1, int(np.ceil(X.shape[2] / V.shape[1]))))[:, :X.shape[2]]

        # Cross-validation
        folds = np.repeat(np.arange(n_folds), int(X.shape[0] / n_folds))
        for i_fold in range(n_folds):
            # Split data to train and test set
            X_trn, y_trn, z_trn = X[folds != i_fold, :, :], y[folds != i_fold], z[folds != i_fold, :, :]
            X_tst, y_tst, z_tst = X[folds == i_fold, :, :], y[folds == i_fold], z[folds == i_fold, :, :]

            # ----------------------------------------------------------------------------------------------------------
            # Train classifier

            # c-VEP event matrix per trial
            E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]
            E_cvep = E_cvep[y_trn, :, :]

            # P300 event matrix per trial
            E_p300 = np.zeros((E_cvep.shape[0], 1, E_cvep.shape[2]))
            for i_trial in range(y_trn.size):
                idx = 0.250 * fs * np.where(z_trn[i_trial, :, y_trn[i_trial]] == 1)[0]
                E_p300[i_trial, 0, idx.astype("int")] = 1

            # Combine event matrices of c-VEP and P300
            if method == "hybrid":
                E = np.concatenate((E_cvep, E_p300), axis=1)
            elif method == "cvep":
                E = E_cvep
            elif method == "p300":
                E = E_p300
            else:
                raise Exception("Unknown method:", method)

            # Structure matrix
            M = pyntbci.utilities.encoding_matrix(E, (fs * encoding_length), int(fs * encoding_stride))

            # Perform CCA
            S = X_trn.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], X.shape[1]))
            D = M.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], M.shape[1]))
            cca = pyntbci.transformers.CCA(n_components=1)
            cca.fit(S, D)

            # ----------------------------------------------------------------------------------------------------------
            # Apply classifier

            # Perform classification per trial because the P300 event structure changes per trial
            yh_tst = np.zeros(y_tst.size)
            for i_trial in range(y_tst.size):

                # c-VEP event matrix
                E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]

                # P300 event matrix
                E_p300 = np.zeros((E_cvep.shape[0], 2, E_cvep.shape[2]))
                for i_side in range(2):
                    for i_target in range(2):
                        idx = 0.250 * fs * np.where(z_tst[i_trial, :, i_side] == i_target)[0]
                        E_p300[i_side, i_target, idx.astype("int")] = 1

                # Combine event matrices of c-VEP and P300
                if method == "hybrid":
                    E = np.concatenate((E_cvep, E_p300), axis=1)
                elif method == "cvep":
                    E = E_cvep
                elif method == "p300":
                    E = E_p300
                else:
                    raise Exception("Unknown method:", method)

                # Structure matrix
                M = pyntbci.utilities.encoding_matrix(E, (fs * encoding_length), int(fs * encoding_stride))

                # Apply template matching classifier
                T = cca.transform(None, M)[1][:, 0, :]
                x = cca.transform(X_tst[[i_trial], :, :], None)[0][:, 0, :]
                yh_tst[i_trial] = np.argmax(pyntbci.utilities.correlation(x, T), axis=1)[0]

            # Compute accuracy
            accuracy[i_subject, i_task, i_fold] = np.mean(yh_tst == y_tst)

        print(f"{accuracy[i_subject, i_task, :].mean():.3f}", end="\t")
    print()

print(f"Average:\tovert: {accuracy[:, 0, :].mean():.3f}\tcovert: {accuracy[:, 1, :].mean():.3f}")

np.savez(os.path.join(data_dir, "derivatives", "cvep_rcca_p300.npz"), accuracy=accuracy)


In [4]:
print(accuracy)

[[[1.   1.   1.   1.  ]
  [0.95 0.85 0.95 0.65]]

 [[1.   1.   1.   1.  ]
  [0.65 1.   0.75 0.75]]

 [[1.   1.   1.   1.  ]
  [0.95 1.   0.9  1.  ]]

 [[1.   1.   1.   1.  ]
  [0.95 0.95 1.   0.85]]

 [[1.   1.   1.   1.  ]
  [1.   0.95 0.9  0.95]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   1.  ]]

 [[1.   1.   1.   1.  ]
  [0.95 1.   0.9  0.8 ]]

 [[1.   1.   1.   1.  ]
  [0.95 0.85 0.95 1.  ]]

 [[1.   1.   1.   1.  ]
  [1.   0.95 0.9  0.9 ]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   1.  ]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   0.95]]

 [[1.   1.   1.   1.  ]
  [0.95 0.85 1.   1.  ]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   0.95]]

 [[0.6  1.   0.4  0.6 ]
  [0.5  0.5  0.45 0.4 ]]

 [[1.   1.   1.   1.  ]
  [1.   0.95 1.   0.95]]

 [[1.   1.   1.   1.  ]
  [1.   1.   0.95 0.95]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   1.  ]]

 [[1.   1.   1.   1.  ]
  [1.   1.   1.   0.95]]

 [[1.   0.8  1.   1.  ]
  [0.95 0.9  0.95 0.85]]

 [[1.   1.   1.   1.  ]
  [0.75 0.95 0.75 0.75]]

