In [1]:
import numpy as np
import os
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from tqdm import tqdm
import matplotlib.pyplot as plt
from h5py import File
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA
from scipy.linalg import eigh
from scipy.io import loadmat
from joblib import Parallel, delayed
from sklearn.pipeline import Pipeline
from scipy.io import savemat


In [6]:
import numpy as np
import os
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from tqdm import tqdm
from scipy.io import loadmat, savemat
from joblib import Parallel, delayed
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA

if __name__ == "__main__":
    Fs = 256
    Co_range = np.arange(1, 12)  
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1, 3, 5, 15, 29, 30, 31, 45, 55, 57, 59]) - 1).tolist()
    num_bank = int(max_freq / step_bank)
    t = [0, 2]
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange), 2], dtype=np.int64)
    print(frange)

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_trca_11.mat')):
        gridcv_Co = loadmat('gridcv_co_trca_11.mat')['gridcv_Co']
        gridcv_Co = np.array(gridcv_Co, dtype=np.int64)
    else:
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

                def lambda_x(f, alg):
                    best_score = -np.inf
                    best_Co = Co_range[0]
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs)]
                    
                    for Co in Co_range:
                        estimators = [('strca', STRCA(Co, optimizer=alg, ifG=True)), ('clf', SVC())]
                        model = Pipeline(estimators)
                        model.fit(train_set, train_label)
                        score = model.score(train_set, train_label)  
                        
                        if score > best_score:
                            best_score = score
                            best_Co = Co
                    
                    return best_Co  

                gridcv_Co[sub, fold, :, 0] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co[sub, fold, :, 1] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        savemat('gridcv_co_trca_11.mat', {'gridcv_Co': gridcv_Co})


[ 2  4  6  8 10]


sub0:: 100%|██████████| 5/5 [01:23<00:00, 16.71s/it]
sub1:: 100%|██████████| 5/5 [01:20<00:00, 16.05s/it]
sub2:: 100%|██████████| 5/5 [01:16<00:00, 15.37s/it]
sub3:: 100%|██████████| 5/5 [01:22<00:00, 16.42s/it]
sub4:: 100%|██████████| 5/5 [01:20<00:00, 16.10s/it]
sub5:: 100%|██████████| 5/5 [01:17<00:00, 15.50s/it]
sub6:: 100%|██████████| 5/5 [01:22<00:00, 16.47s/it]
sub7:: 100%|██████████| 5/5 [01:20<00:00, 16.04s/it]
sub8:: 100%|██████████| 5/5 [01:24<00:00, 16.88s/it]
sub9:: 100%|██████████| 5/5 [01:23<00:00, 16.74s/it]
sub10:: 100%|██████████| 5/5 [01:16<00:00, 15.26s/it]
sub11:: 100%|██████████| 5/5 [01:16<00:00, 15.32s/it]
sub12:: 100%|██████████| 5/5 [01:23<00:00, 16.61s/it]
sub13:: 100%|██████████| 5/5 [01:17<00:00, 15.60s/it]
sub14:: 100%|██████████| 5/5 [01:20<00:00, 16.08s/it]


In [13]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1, 3, 5, 15, 29, 30, 31, 45, 55, 57, 59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    # 加载之前计算出的最佳Co值
    gridcv_Co = loadmat('gridcv_co_trca_11.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            # 从gridcv_Co加载最佳Co值，并确保其为整数类型
            best_Co_trca = int(gridcv_Co[sub, fold, 0, 0])  # 使用trca算法时的最佳Co值
            best_Co_tdlda = int(gridcv_Co[sub, fold, 0, 1])  # 使用tdlda算法时的最佳Co值

            # 使用这些Co值直接进行评估

            # ifG=False
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            # ifG=True
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    # 打印结果
    print("最佳平均准确率 ifG=False:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("最佳平均准确率 ifG=True:", np.mean(result[:, :, 3], axis=(0, 1)))



Sub0: Final Evaluation: 100%|██████████| 5/5 [00:19<00:00,  3.93s/it]
Sub1: Final Evaluation: 100%|██████████| 5/5 [00:19<00:00,  3.91s/it]
Sub2: Final Evaluation: 100%|██████████| 5/5 [00:15<00:00,  3.13s/it]
Sub3: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.15s/it]
Sub4: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.16s/it]
Sub5: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.16s/it]
Sub6: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.13s/it]
Sub7: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.08s/it]
Sub8: Final Evaluation: 100%|██████████| 5/5 [00:22<00:00,  4.46s/it]
Sub9: Final Evaluation: 100%|██████████| 5/5 [00:24<00:00,  4.83s/it]
Sub10: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.20s/it]
Sub11: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.11s/it]
Sub12: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.06s/it]
Sub13: Final Evaluation: 100%|██████████| 5/5 [00:23<00:00,  4.70s/it]
Sub14: Final Eva

最佳平均准确率 ifG=False: 0.43771437874934643
最佳平均准确率 ifG=True: 0.44524568113551527





In [14]:
import numpy as np
import os
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from tqdm import tqdm
from scipy.io import loadmat, savemat
from joblib import Parallel, delayed
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA

if __name__ == "__main__":
    Fs = 256
    Co_range = np.arange(1, 32)  
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    num_bank = int(max_freq / step_bank)
    t = [0, 2]
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange), 2], dtype=np.int64)
    print(frange)

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_trca_31.mat')):
        gridcv_Co = loadmat('gridcv_co_trca_31.mat')['gridcv_Co']
        gridcv_Co = np.array(gridcv_Co, dtype=np.int64)
    else:
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

                def lambda_x(f, alg):
                    best_score = -np.inf
                    best_Co = Co_range[0]
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs)]
                    
                    for Co in Co_range:
                        estimators = [('strca', STRCA(Co, optimizer=alg, ifG=True)), ('clf', SVC())]
                        model = Pipeline(estimators)
                        model.fit(train_set, train_label)
                        score = model.score(train_set, train_label)  
                        
                        if score > best_score:
                            best_score = score
                            best_Co = Co
                    
                    return best_Co  

                gridcv_Co[sub, fold, :, 0] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co[sub, fold, :, 1] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        savemat('gridcv_co_trca_31.mat', {'gridcv_Co': gridcv_Co})


[ 2  4  6  8 10]


sub0:: 100%|██████████| 5/5 [10:39<00:00, 127.91s/it]
sub1:: 100%|██████████| 5/5 [10:22<00:00, 124.53s/it]
sub2:: 100%|██████████| 5/5 [09:57<00:00, 119.44s/it]
sub3:: 100%|██████████| 5/5 [10:40<00:00, 128.05s/it]
sub4:: 100%|██████████| 5/5 [10:23<00:00, 124.80s/it]
sub5:: 100%|██████████| 5/5 [09:56<00:00, 119.25s/it]
sub6:: 100%|██████████| 5/5 [10:41<00:00, 128.25s/it]
sub7:: 100%|██████████| 5/5 [10:22<00:00, 124.57s/it]
sub8:: 100%|██████████| 5/5 [10:56<00:00, 131.24s/it]
sub9:: 100%|██████████| 5/5 [10:50<00:00, 130.16s/it]
sub10:: 100%|██████████| 5/5 [09:51<00:00, 118.24s/it]
sub11:: 100%|██████████| 5/5 [09:52<00:00, 118.56s/it]
sub12:: 100%|██████████| 5/5 [10:45<00:00, 129.05s/it]
sub13:: 100%|██████████| 5/5 [10:04<00:00, 120.97s/it]
sub14:: 100%|██████████| 5/5 [10:24<00:00, 124.83s/it]


In [15]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    # 加载之前计算出的最佳Co值
    gridcv_Co = loadmat('gridcv_co_trca_31.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            # 从gridcv_Co加载最佳Co值，并确保其为整数类型
            best_Co_trca = int(gridcv_Co[sub, fold, 0, 0])  # 使用trca算法时的最佳Co值
            best_Co_tdlda = int(gridcv_Co[sub, fold, 0, 1])  # 使用tdlda算法时的最佳Co值

            # 使用这些Co值直接进行评估

            # ifG=False
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            # ifG=True
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    # 打印结果
    print("最佳平均准确率 ifG=False:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("最佳平均准确率 ifG=True:", np.mean(result[:, :, 3], axis=(0, 1)))


Sub0: Final Evaluation: 100%|██████████| 5/5 [01:07<00:00, 13.46s/it]
Sub1: Final Evaluation: 100%|██████████| 5/5 [01:05<00:00, 13.01s/it]
Sub2: Final Evaluation: 100%|██████████| 5/5 [01:05<00:00, 13.08s/it]
Sub3: Final Evaluation: 100%|██████████| 5/5 [01:12<00:00, 14.57s/it]
Sub4: Final Evaluation: 100%|██████████| 5/5 [01:17<00:00, 15.57s/it]
Sub5: Final Evaluation: 100%|██████████| 5/5 [01:06<00:00, 13.35s/it]
Sub6: Final Evaluation: 100%|██████████| 5/5 [01:12<00:00, 14.55s/it]
Sub7: Final Evaluation: 100%|██████████| 5/5 [01:12<00:00, 14.40s/it]
Sub8: Final Evaluation: 100%|██████████| 5/5 [01:01<00:00, 12.27s/it]
Sub9: Final Evaluation: 100%|██████████| 5/5 [01:17<00:00, 15.49s/it]
Sub10: Final Evaluation: 100%|██████████| 5/5 [01:02<00:00, 12.52s/it]
Sub11: Final Evaluation: 100%|██████████| 5/5 [01:02<00:00, 12.50s/it]
Sub12: Final Evaluation: 100%|██████████| 5/5 [01:23<00:00, 16.70s/it]
Sub13: Final Evaluation: 100%|██████████| 5/5 [01:14<00:00, 14.82s/it]
Sub14: Final Eva

最佳平均准确率 ifG=False: 0.45031526115527554
最佳平均准确率 ifG=True: 0.4489453610308693





In [4]:
import numpy as np
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA
from tqdm import tqdm
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat
import os

if __name__=="__main__":
    Fs = 256
    Co = 11
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank= 2
    frange = np.arange(step_bank,max_freq+1,step_bank)
    channels = (np.int64([1,3,5,15,29,30,31,45,55,57,59])-1).tolist()
    num_bank = int(max_freq/step_bank)
    t  = [0, 2]
    result = np.zeros([num_subj, num_fold, len(frange), 2])
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange), 2], dtype=np.int64)
    print(frange)

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_trca_11.mat')):
        gridcv_Co = loadmat('gridcv_co_trca_11.mat', {'gridcv_Co': gridcv_Co})['gridcv_Co']
        gridcv_Co = np.array(gridcv_Co, dtype=np.int64)
    else:
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset=Dataset(num_class=7,path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

                def lambda_x(f, alg):
                    parameters = {'strca__Co':np.arange(Co)+1}
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0,1,2,3,4,5,6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    # test_set  = test_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    estimators = [('strca', STRCA(Co, optimizer=alg, ifG=True)), ('clf', SVC())]
                    model = Pipeline(estimators)
                    clf = GridSearchCV(model, parameters, scoring="accuracy", cv=4)
                    clf.fit(train_set, train_label)
                    return clf.best_params_['strca__Co']

                gridcv_Co[sub, fold, :, 0] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co[sub, fold, :, 1] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        savemat('gridcv_co_trca_11.mat', {'gridcv_Co': gridcv_Co})



[ 2  4  6  8 10]


sub0:: 100%|██████████| 5/5 [02:31<00:00, 30.22s/it]
sub1:: 100%|██████████| 5/5 [02:24<00:00, 28.85s/it]
sub2:: 100%|██████████| 5/5 [02:18<00:00, 27.62s/it]
sub3:: 100%|██████████| 5/5 [02:28<00:00, 29.68s/it]
sub4:: 100%|██████████| 5/5 [02:25<00:00, 29.01s/it]
sub5:: 100%|██████████| 5/5 [02:38<00:00, 31.74s/it]
sub6:: 100%|██████████| 5/5 [02:51<00:00, 34.28s/it]
sub7:: 100%|██████████| 5/5 [02:43<00:00, 32.70s/it]
sub8:: 100%|██████████| 5/5 [02:51<00:00, 34.32s/it]
sub9:: 100%|██████████| 5/5 [02:52<00:00, 34.41s/it]
sub10:: 100%|██████████| 5/5 [02:29<00:00, 29.82s/it]
sub11:: 100%|██████████| 5/5 [02:19<00:00, 27.87s/it]
sub12:: 100%|██████████| 5/5 [02:31<00:00, 30.27s/it]
sub13:: 100%|██████████| 5/5 [02:21<00:00, 28.39s/it]
sub14:: 100%|██████████| 5/5 [02:25<00:00, 29.10s/it]


In [9]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1, 3, 5, 15, 29, 30, 31, 45, 55, 57, 59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    # 加载之前计算出的最佳Co值
    gridcv_Co = loadmat('gridcv_co_trca_11.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            # 从gridcv_Co加载最佳Co值，并确保其为整数类型
            best_Co_trca = int(gridcv_Co[sub, fold, 0, 0])  # 使用trca算法时的最佳Co值
            best_Co_tdlda = int(gridcv_Co[sub, fold, 0, 1])  # 使用tdlda算法时的最佳Co值

            # 使用这些Co值直接进行评估

            # ifG=False
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            # ifG=True
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    # 打印结果
    print("最佳平均准确率 ifG=False:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("最佳平均准确率 ifG=True:", np.mean(result[:, :, 3], axis=(0, 1)))


Sub0: Final Evaluation: 100%|██████████| 5/5 [00:18<00:00,  3.76s/it]
Sub1: Final Evaluation: 100%|██████████| 5/5 [00:16<00:00,  3.36s/it]
Sub2: Final Evaluation: 100%|██████████| 5/5 [00:13<00:00,  2.67s/it]
Sub3: Final Evaluation: 100%|██████████| 5/5 [00:19<00:00,  3.96s/it]
Sub4: Final Evaluation: 100%|██████████| 5/5 [00:19<00:00,  4.00s/it]
Sub5: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.19s/it]
Sub6: Final Evaluation: 100%|██████████| 5/5 [00:19<00:00,  3.82s/it]
Sub7: Final Evaluation: 100%|██████████| 5/5 [00:16<00:00,  3.39s/it]
Sub8: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.14s/it]
Sub9: Final Evaluation: 100%|██████████| 5/5 [00:22<00:00,  4.40s/it]
Sub10: Final Evaluation: 100%|██████████| 5/5 [00:18<00:00,  3.75s/it]
Sub11: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.01s/it]
Sub12: Final Evaluation: 100%|██████████| 5/5 [00:18<00:00,  3.69s/it]
Sub13: Final Evaluation: 100%|██████████| 5/5 [00:21<00:00,  4.33s/it]
Sub14: Final Eva

最佳平均准确率 ifG=False: 0.4402106737760668
最佳平均准确率 ifG=True: 0.4467967569757045





In [10]:
import numpy as np
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA
from tqdm import tqdm
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat
import os

if __name__=="__main__":
    Fs = 256
    Co = 31
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank= 2
    frange = np.arange(step_bank,max_freq+1,step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    num_bank = int(max_freq/step_bank)
    t  = [0, 2]
    result = np.zeros([num_subj, num_fold, len(frange), 2])
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange), 2], dtype=np.int64)
    print(frange)

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_trca_31.mat')):
        gridcv_Co = loadmat('gridcv_co_trca_31.mat', {'gridcv_Co': gridcv_Co})['gridcv_Co']
        gridcv_Co = np.array(gridcv_Co, dtype=np.int64)
    else:
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset=Dataset(num_class=7,path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

                def lambda_x(f, alg):
                    parameters = {'strca__Co':np.arange(Co)+1}
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0,1,2,3,4,5,6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    # test_set  = test_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    estimators = [('strca', STRCA(Co, optimizer=alg, ifG=True)), ('clf', SVC())]
                    model = Pipeline(estimators)
                    clf = GridSearchCV(model, parameters, scoring="accuracy", cv=4)
                    clf.fit(train_set, train_label)
                    return clf.best_params_['strca__Co']

                gridcv_Co[sub, fold, :, 0] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co[sub, fold, :, 1] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        savemat('gridcv_co_trca_31.mat', {'gridcv_Co': gridcv_Co})


[ 2  4  6  8 10]


sub0:: 100%|██████████| 5/5 [22:06<00:00, 265.21s/it]
sub1:: 100%|██████████| 5/5 [21:34<00:00, 258.95s/it]
sub2:: 100%|██████████| 5/5 [20:39<00:00, 247.87s/it]
sub3:: 100%|██████████| 5/5 [22:10<00:00, 266.06s/it]
sub4:: 100%|██████████| 5/5 [21:45<00:00, 261.07s/it]
sub5:: 100%|██████████| 5/5 [20:44<00:00, 248.97s/it]
sub6:: 100%|██████████| 5/5 [22:20<00:00, 268.17s/it]
sub7:: 100%|██████████| 5/5 [21:44<00:00, 260.85s/it]
sub8:: 100%|██████████| 5/5 [22:47<00:00, 273.43s/it]
sub9:: 100%|██████████| 5/5 [22:40<00:00, 272.09s/it]
sub10:: 100%|██████████| 5/5 [20:35<00:00, 247.18s/it]
sub11:: 100%|██████████| 5/5 [20:42<00:00, 248.40s/it]
sub12:: 100%|██████████| 5/5 [22:27<00:00, 269.54s/it]
sub13:: 100%|██████████| 5/5 [21:08<00:00, 253.68s/it]
sub14:: 100%|██████████| 5/5 [21:46<00:00, 261.31s/it]


In [11]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    # 加载之前计算出的最佳Co值
    gridcv_Co = loadmat('gridcv_co_trca_31.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            # 从gridcv_Co加载最佳Co值，并确保其为整数类型
            best_Co_trca = int(gridcv_Co[sub, fold, 0, 0])  # 使用trca算法时的最佳Co值
            best_Co_tdlda = int(gridcv_Co[sub, fold, 0, 1])  # 使用tdlda算法时的最佳Co值

            # 使用这些Co值直接进行评估

            # ifG=False
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            # ifG=True
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    # 打印结果
    print("最佳平均准确率 ifG=False:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("最佳平均准确率 ifG=True:", np.mean(result[:, :, 3], axis=(0, 1)))


Sub0: Final Evaluation: 100%|██████████| 5/5 [00:51<00:00, 10.29s/it]
Sub1: Final Evaluation: 100%|██████████| 5/5 [00:48<00:00,  9.62s/it]
Sub2: Final Evaluation: 100%|██████████| 5/5 [00:32<00:00,  6.52s/it]
Sub3: Final Evaluation: 100%|██████████| 5/5 [00:34<00:00,  6.95s/it]
Sub4: Final Evaluation: 100%|██████████| 5/5 [01:06<00:00, 13.22s/it]
Sub5: Final Evaluation: 100%|██████████| 5/5 [00:48<00:00,  9.78s/it]
Sub6: Final Evaluation: 100%|██████████| 5/5 [00:51<00:00, 10.38s/it]
Sub7: Final Evaluation: 100%|██████████| 5/5 [00:39<00:00,  7.81s/it]
Sub8: Final Evaluation: 100%|██████████| 5/5 [00:49<00:00,  9.98s/it]
Sub9: Final Evaluation: 100%|██████████| 5/5 [00:39<00:00,  7.86s/it]
Sub10: Final Evaluation: 100%|██████████| 5/5 [00:52<00:00, 10.59s/it]
Sub11: Final Evaluation: 100%|██████████| 5/5 [00:46<00:00,  9.27s/it]
Sub12: Final Evaluation: 100%|██████████| 5/5 [00:44<00:00,  8.85s/it]
Sub13: Final Evaluation: 100%|██████████| 5/5 [01:04<00:00, 12.92s/it]
Sub14: Final Eva

最佳平均准确率 ifG=False: 0.4570198161684825
最佳平均准确率 ifG=True: 0.45638942197377685





In [8]:
import numpy as np
from sklearn.svm import SVC
from tqdm import tqdm
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat
import os
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    Co_range = np.arange(1, 12)  # Co 的范围
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1, 3, 5, 15, 29, 30, 31, 45, 55, 57, 59]) - 1).tolist()
    t = [0, 2]
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange)], dtype=np.int64)  # 存储最佳的 Co 参数

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_fbtrca_tdlda11_bestCo.mat')):
        gridcv_Co = loadmat('gridcv_co_fbtrca_tdlda11_bestCo.mat')['gridcv_Co']
    else:
        for sub in range(num_subj):
            # 加载数据集
            dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
            dataset.load_data(sub, None)
            dataset.divide_data(num_fold)
            dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')
            
            for fold in tqdm(range(num_fold), desc=f'Sub{sub}:'):
                # 获取训练集和测试集的数据和标签
                train_set, train_label, test_set, test_label = dataset.retreive_data(
                    class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
                
                # 对时间维度进行切片
                train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
                test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

                # 使用网格搜索来找到最佳的 Co 参数
                def find_best_Co():
                    best_score = -np.inf
                    best_Co = Co_range[0]
                    for Co in Co_range:
                        fbtrca = FBTRCA(Co, frange, optimizer='tdlda', ifG=False)
                        
                        fbtrca.fit(train_set, train_label)
                        train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
                        svm = SVC(kernel='rbf')
                        svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
                        score = svm.score(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
                        
                        if score > best_score:
                            best_score = score
                            best_Co = Co
                    
                    return best_Co

                best_Co = find_best_Co()

                gridcv_Co[sub, fold, :] = best_Co

        savemat('gridcv_co_fbtrca_tdlda11_bestCo.mat', {'gridcv_Co': gridcv_Co})




Sub0:: 100%|██████████| 5/5 [01:34<00:00, 18.97s/it]
Sub1:: 100%|██████████| 5/5 [01:32<00:00, 18.53s/it]
Sub2:: 100%|██████████| 5/5 [01:28<00:00, 17.70s/it]
Sub3:: 100%|██████████| 5/5 [01:34<00:00, 18.99s/it]
Sub4:: 100%|██████████| 5/5 [01:32<00:00, 18.56s/it]
Sub5:: 100%|██████████| 5/5 [01:28<00:00, 17.69s/it]
Sub6:: 100%|██████████| 5/5 [01:35<00:00, 19.06s/it]
Sub7:: 100%|██████████| 5/5 [01:32<00:00, 18.53s/it]
Sub8:: 100%|██████████| 5/5 [01:37<00:00, 19.49s/it]
Sub9:: 100%|██████████| 5/5 [01:36<00:00, 19.33s/it]
Sub10:: 100%|██████████| 5/5 [01:27<00:00, 17.58s/it]
Sub11:: 100%|██████████| 5/5 [01:28<00:00, 17.66s/it]
Sub12:: 100%|██████████| 5/5 [01:35<00:00, 19.13s/it]
Sub13:: 100%|██████████| 5/5 [01:30<00:00, 18.01s/it]
Sub14:: 100%|██████████| 5/5 [01:32<00:00, 18.57s/it]


In [11]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1, 3, 5, 15, 29, 30, 31, 45, 55, 57, 59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    # 加载之前计算出的最佳Co值
    gridcv_Co = loadmat('gridcv_co_fbtrca_tdlda11_bestCo.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            best_Co_tdlda = int(gridcv_Co[sub, fold, 0])  # 适应三维数组

            # 使用这些Co值直接进行评估

            # ifG=False
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            # ifG=True
            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    # 打印结果
    print("最佳平均准确率 ifG=False:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("最佳平均准确率 ifG=True:", np.mean(result[:, :, 3], axis=(0, 1)))


Sub0: Final Evaluation: 100%|██████████| 5/5 [00:23<00:00,  4.77s/it]
Sub1: Final Evaluation: 100%|██████████| 5/5 [00:25<00:00,  5.08s/it]
Sub2: Final Evaluation: 100%|██████████| 5/5 [00:20<00:00,  4.11s/it]
Sub3: Final Evaluation: 100%|██████████| 5/5 [00:26<00:00,  5.25s/it]
Sub4: Final Evaluation: 100%|██████████| 5/5 [00:25<00:00,  5.01s/it]
Sub5: Final Evaluation: 100%|██████████| 5/5 [00:23<00:00,  4.71s/it]
Sub6: Final Evaluation: 100%|██████████| 5/5 [00:26<00:00,  5.32s/it]
Sub7: Final Evaluation: 100%|██████████| 5/5 [00:24<00:00,  4.96s/it]
Sub8: Final Evaluation: 100%|██████████| 5/5 [00:23<00:00,  4.73s/it]
Sub9: Final Evaluation: 100%|██████████| 5/5 [00:25<00:00,  5.01s/it]
Sub10: Final Evaluation: 100%|██████████| 5/5 [00:24<00:00,  4.93s/it]
Sub11: Final Evaluation: 100%|██████████| 5/5 [00:22<00:00,  4.59s/it]
Sub12: Final Evaluation: 100%|██████████| 5/5 [00:24<00:00,  4.92s/it]
Sub13: Final Evaluation: 100%|██████████| 5/5 [00:24<00:00,  4.91s/it]
Sub14: Final Eva

最佳平均准确率 ifG=False: 0.435935838842616
最佳平均准确率 ifG=True: 0.4395485674410691





In [None]:
import numpy as np
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA
from tqdm import tqdm
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat
import os

if __name__=="__main__":
    Fs = 256
    Co = 31
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank= 2
    frange = np.arange(step_bank,max_freq+1,step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    num_bank = int(max_freq/step_bank)
    t  = [0, 2]
    result = np.zeros([num_subj, num_fold, len(frange), 2])
    gridcv_Co = np.zeros([num_subj, num_fold, len(frange), 2], dtype=np.int64)
    print(frange)

    if os.path.exists(os.path.join(os.getcwd(), 'gridcv_co_trca_31.mat')):
        gridcv_Co = loadmat('gridcv_co_trca_31.mat', {'gridcv_Co': gridcv_Co})['gridcv_Co']
        gridcv_Co = np.array(gridcv_Co, dtype=np.int64)
    else:
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset=Dataset(num_class=7,path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

                def lambda_x(f, alg):
                    parameters = {'strca__Co':np.arange(Co)+1}
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0,1,2,3,4,5,6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    estimators = [('strca', STRCA(Co, optimizer=alg, ifG=True)), ('clf', SVC())]
                    model = Pipeline(estimators)
                    clf = GridSearchCV(model, parameters, scoring="accuracy", cv=4)
                    clf.fit(train_set, train_label)
                    return clf.best_params_['strca__Co']

                gridcv_Co[sub, fold, :, 0] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co[sub, fold, :, 1] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        savemat('gridcv_co_trca_31.mat', {'gridcv_Co': gridcv_Co})

In [None]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    gridcv_Co = loadmat('gridcv_co_trca_31.mat')['gridcv_Co']
    gridcv_Co = np.array(gridcv_Co, dtype=np.int64)

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            best_Co_trca = int(gridcv_Co[sub, fold, 0, 0])  
            best_Co_tdlda = int(gridcv_Co[sub, fold, 0, 1])  

            fbtrca = FBTRCA(best_Co_trca, frange, optimizer='trca', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 0] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_trca, frange, optimizer='trca', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 1] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    print("trca优化器, ifG=False 的最佳平均准确率:", np.mean(result[:, :, 0], axis=(0, 1)))
    print("trca优化器, ifG=True 的最佳平均准确率:", np.mean(result[:, :, 1], axis=(0, 1)))
    print("tdlda优化器, ifG=False 的最佳平均准确率:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("tdlda优化器, ifG=True 的最佳平均准确率:", np.mean(result[:, :, 3], axis=(0, 1)))


In [2]:
import numpy as np
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from scripts.dataset import Dataset
from scripts.model import STRCA, FBTRCA
from tqdm import tqdm
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat
import os

if __name__ == "__main__":
    Fs = 256
    Co = 31
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])
    
    gridcv_Co_trca = np.zeros([num_subj, num_fold, len(frange)], dtype=np.int64)
    gridcv_Co_tdlda = np.zeros([num_subj, num_fold, len(frange)], dtype=np.int64)

    if not (os.path.exists('gridcv_co_trca_31.mat') and os.path.exists('gridcv_co_tdlda_31.mat')):
        for sub in range(num_subj):
            for fold in tqdm(range(num_fold), desc=f'sub{sub}:'):
                dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
                dataset.load_data(sub, 'zscore')
                dataset.divide_data(num_fold)
                dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')
                
                def lambda_x(f, alg):
                    parameters = {'model__Co': np.arange(Co)+1}
                    train_set, train_label, _, _ = dataset.retreive_data(
                        class_index=[0,1,2,3,4,5,6], fold_index=fold, filter_index=f+1)
                    train_set = train_set[:,:,int(t[0]*Fs):int(t[1]*Fs)]
                    
                    if alg == 'trca':
                        estimators = [('model', STRCA(Co, optimizer='trca', ifG=True)), ('clf', SVC())]
                    else:  # tdlda
                        estimators = [('model', FBTRCA(Co, frange, optimizer='tdlda', ifG=True)), ('clf', SVC())]
                    
                    model = Pipeline(estimators)
                    clf = GridSearchCV(model, parameters, scoring="accuracy", cv=4)
                    clf.fit(train_set, train_label)
                    return clf.best_params_['model__Co']
                
                gridcv_Co_trca[sub, fold, :] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'trca') for f in range(len(frange)))
                gridcv_Co_tdlda[sub, fold, :] = Parallel(n_jobs=len(frange))(delayed(lambda_x)(f, 'tdlda') for f in range(len(frange)))
        
        savemat('gridcv_co_trca_31.mat', {'gridcv_Co_trca': gridcv_Co_trca})
        savemat('gridcv_co_tdlda_31.mat', {'gridcv_Co_tdlda': gridcv_Co_tdlda})
    else:
        gridcv_Co_trca = loadmat('gridcv_co_trca_31.mat')['gridcv_Co_trca']
        gridcv_Co_tdlda = loadmat('gridcv_co_tdlda_31.mat')['gridcv_Co_tdlda']


sub0::   0%|          | 0/5 [00:03<?, ?it/s]


TypeError: STRCA.__init__() got multiple values for argument 'optimizer'

In [None]:
import numpy as np
import os
from sklearn.svm import SVC
from tqdm import tqdm
from scipy.io import loadmat
from scripts.dataset import Dataset
from scripts.model import FBTRCA

if __name__ == "__main__":
    Fs = 256
    num_fold = 5
    num_subj = 15
    max_freq = 10
    step_bank = 2
    frange = np.arange(step_bank, max_freq + 1, step_bank)
    channels = (np.int64([1,3,5,6,11,12,13,15,17,18,20,21,24,25,27,29,30,31,33,35,36,39,40,42,43,45,47,48,55,57,59]) - 1).tolist()
    t = [0, 2]
    result = np.zeros([num_subj, num_fold, 4])

    gridcv_Co_trca = loadmat('gridcv_co_trca_31.mat')['gridcv_Co_trca']
    gridcv_Co_tdlda = loadmat('gridcv_co_tdlda_31.mat')['gridcv_Co_tdlda']

    for sub in range(num_subj):
        dataset = Dataset(num_class=7, path='Dataset/OData/Dataset I/', channel=channels)
        dataset.load_data(sub, 'zscore')
        dataset.divide_data(num_fold)
        dataset.filter_data(fs=Fs, frange=frange, order=8, btype='lowpass')

        for fold in tqdm(range(num_fold), desc=f'Sub{sub}: Final Evaluation'):
            train_set, train_label, test_set, test_label = dataset.retreive_data(
                class_index=[0, 1, 2, 3, 4, 5, 6], fold_index=fold, filter_index=np.arange(len(frange)) + 1)
            train_set = train_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]
            test_set = test_set[:, :, int(t[0] * Fs):int(t[1] * Fs), :]

            best_Co_trca = int(gridcv_Co_trca[sub, fold, 0])
            best_Co_tdlda = int(gridcv_Co_tdlda[sub, fold, 0])

            fbtrca = FBTRCA(best_Co_trca, frange, optimizer='trca', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm = SVC(kernel='rbf')
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 0] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_trca, frange, optimizer='trca', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 1] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=False)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 2] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

            fbtrca = FBTRCA(best_Co_tdlda, frange, optimizer='tdlda', ifG=True)
            fbtrca.fit(train_set, train_label)
            train_data = fbtrca.transform(train_set).reshape((len(train_label), fbtrca.K, 8, len(frange)))
            test_data = fbtrca.transform(test_set).reshape((len(test_label), fbtrca.K, 8, len(frange)))
            svm.fit(train_data.reshape((len(train_label), fbtrca.K * 8 * len(frange))), train_label)
            result[sub, fold, 3] = svm.score(test_data.reshape((len(test_label), fbtrca.K * 8 * len(frange))), test_label)

    print("trca优化器, ifG=False 的最佳平均准确率:", np.mean(result[:, :, 0], axis=(0, 1)))
    print("trca优化器, ifG=True 的最佳平均准确率:", np.mean(result[:, :, 1], axis=(0, 1)))
    print("tdlda优化器, ifG=False 的最佳平均准确率:", np.mean(result[:, :, 2], axis=(0, 1)))
    print("tdlda优化器, ifG=True 的最佳平均准确率:", np.mean(result[:, :, 3], axis=(0, 1)))