In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Jordy Thielen (jordy.thielen@donders.ru.nl)
"""

import os

import numpy as np
import pyntbci

#data_dir = os.path.join(os.path.expanduser("~"), "data", "cvep_covert", "experiment")

wd = r'C:\Users\Radovan\OneDrive\Radboud\First\Lab_rotatio\Thiele_BCI'
os.chdir(wd)
data_dir = os.path.join(wd, "data", "experiment")  # Data directory
repo_dir = os.path.join(wd, "data")  # Repo directory
#npz_data = np.load(file_path)
#subjects = ["VPpdia", "VPpdib"]
subjects = ["VPpdia", "VPpdib", "VPpdic", "VPpdid", "VPpdie", "VPpdif", "VPpdig", "VPpdih", "VPpdii", "VPpdij",
            "VPpdik", "VPpdil", "VPpdim", "VPpdin", "VPpdio", "VPpdip", "VPpdiq", "VPpdir", "VPpdis", "VPpdit",
            "VPpdiu", "VPpdiv", "VPpdiw", "VPpdix", "VPpdiy", "VPpdiz", "VPpdiza", "VPpdizb", "VPpdizc"]
session = "S001"
tasks = ["covert"]#, "covert"]

event = "refe"
onset_event = True
encoding_length = 0.5
ensemble = False
n_folds = 4
method = "hybrid"  # cvep, p300, hybrid

# Loop participants
accuracy = np.zeros((len(subjects), len(tasks), n_folds))
for i_subject, subject in enumerate(subjects):
    print(f"{subject}", end="\t")

    # Loop tasks
    for i_task, task in enumerate(tasks):
        print(f"{task}: ", end="")

        # Load data
        fn = os.path.join(data_dir, "derivatives", f"sub-{subject}", f"sub-{subject}_task-{task}.npz")
        tmp = np.load(fn)
        fs = int(tmp["fs"])
        X = tmp["X"]  # [:, :, int(fs * 2.1):]
        y = tmp["y"]
        z = tmp["z"]
        V = tmp["V"]
        V = np.tile(V, (1, int(np.ceil(X.shape[2] / V.shape[1]))))[:, :X.shape[2]]

        # Cross-validation
        folds = np.repeat(np.arange(n_folds), int(X.shape[0] / n_folds))
        for i_fold in range(n_folds):
            # Split data to train and test set
            X_trn, y_trn, z_trn = X[folds != i_fold, :, :], y[folds != i_fold], z[folds != i_fold, :, :]
            X_tst, y_tst, z_tst = X[folds == i_fold, :, :], y[folds == i_fold], z[folds == i_fold, :, :]

            # ----------------------------------------------------------------------------------------------------------
            # Train classifier

            # c-VEP event matrix per trial
            E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]
            E_cvep = E_cvep[y_trn, :, :]

            # P300 event matrix per trial
            E_p300 = np.zeros((E_cvep.shape[0], 2, E_cvep.shape[2]))
            for i_trial in range(y_trn.size):
                for i_target in range(2):
                    idx = 0.250 * fs * np.where(z_trn[i_trial, :, y_trn[i_trial]] == i_target)[0]
                    E_p300[i_trial, i_target, idx.astype("int")] = 1

            # Combine event matrices of c-VEP and P300
            if method == "hybrid":
                E = np.concatenate((E_cvep, E_p300), axis=1)
            elif method == "cvep":
                E = E_cvep
            elif method == "p300":
                E = E_p300
            else:
                raise Exception("Unknown method:", method)

            # Structure matrix
            M = pyntbci.utilities.encoding_matrix(E, int(fs * encoding_length))

            # Perform CCA
            S = X_trn.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], X.shape[1]))
            D = M.transpose((0, 2, 1)).reshape((y_trn.size * X.shape[2], M.shape[1]))
            cca = pyntbci.transformers.CCA(n_components=1)
            cca.fit(S, D)

            # ----------------------------------------------------------------------------------------------------------
            # Apply classifier

            # Perform classification per trial because the P300 event structure changes per trial
            yh_tst = np.zeros(y_tst.size)
            for i_trial in range(y_tst.size):

                # c-VEP event matrix
                E_cvep = pyntbci.utilities.event_matrix(V, event, onset_event)[0]

                # P300 event matrix
                E_p300 = np.zeros((E_cvep.shape[0], 2, E_cvep.shape[2]))
                for i_side in range(2):
                    for i_target in range(2):
                        idx = 0.250 * fs * np.where(z_tst[i_trial, :, i_side] == i_target)[0]
                        E_p300[i_side, i_target, idx.astype("int")] = 1

                # Combine event matrices of c-VEP and P300
                if method == "hybrid":
                    E = np.concatenate((E_cvep, E_p300), axis=1)
                elif method == "cvep":
                    E = E_cvep
                elif method == "p300":
                    E = E_p300
                else:
                    raise Exception("Unknown method:", method)

                # Structure matrix
                M = pyntbci.utilities.encoding_matrix(E, int(fs * encoding_length))

                # Apply template matching classifier
                T = cca.transform(None, M)[1][:, 0, :]
                x = cca.transform(X_tst[[i_trial], :, :], None)[0][:, 0, :]
                yh_tst[i_trial] = np.argmax(pyntbci.utilities.correlation(x, T), axis=1)[0]

            # Compute accuracy
            accuracy[i_subject, i_task, i_fold] = np.mean(yh_tst == y_tst)

        print(f"{accuracy[i_subject, i_task, :].mean():.3f}", end="\t")
    print()

print(f"Average:\tcovert: {accuracy[:, 0, :].mean():.3f}")#\tcovert: {accuracy[:, 1, :].mean():.3f}")

np.savez(os.path.join(data_dir, "derivatives", "cvep_rcca_p300.npz"), accuracy=accuracy)


VPpdia	covert: 

KeyboardInterrupt: 

In [2]:
accuracy

array([[[0.95, 0.9 , 0.8 , 0.85]],

       [[0.75, 1.  , 0.9 , 0.95]]])