In [3]:
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
import mat73
from scipy.linalg import dft
from scipy import linalg
from scipy.signal import butter, iirnotch, lfilter, freqz, filtfilt
from scipy.fft import fft, fftfreq
import warnings
%matplotlib inline

In [4]:
with open('/kaggle/input/br41n-ssvep-data/ssvep/classInfo_4_5.m') as f:
    targets_str= f.read().split('\n')

targets=[]
for label in targets_str[:-1]:
    targets.append(list(map(int,label.split(' '))))

In [5]:
subject= 1
sub1_train1_data = '/kaggle/input/br41n-ssvep-data/ssvep/subject_1_fvep_led_training_1.mat'
sub1_train2_data = '/kaggle/input/br41n-ssvep-data/ssvep/subject_1_fvep_led_training_2.mat'
sub2_train1_data = '/kaggle/input/br41n-ssvep-data/ssvep/subject_2_fvep_led_training_1.mat'
sub2_train2_data = '/kaggle/input/br41n-ssvep-data/ssvep/subject_2_fvep_led_training_2.mat'

data_dict_sub1_train1 = mat73.loadmat(sub1_train1_data)
data_dict_sub1_train2 = mat73.loadmat(sub1_train2_data)
data_dict_sub2_train1 = mat73.loadmat(sub2_train1_data)
data_dict_sub2_train2 = mat73.loadmat(sub2_train2_data)

print(data_dict_sub1_train1.keys(), data_dict_sub1_train2.keys(), data_dict_sub2_train1.keys(), data_dict_sub2_train2.keys())

data_sub1_train1 = data_dict_sub1_train1['y']
data_sub1_train2 = data_dict_sub1_train2['y']
data_sub2_train1 = data_dict_sub2_train1['y']
data_sub2_train2 = data_dict_sub2_train2['y']

print(data_sub1_train1.shape, data_sub1_train2.shape, data_sub2_train1.shape, data_sub2_train2.shape)

dict_keys(['y']) dict_keys(['y']) dict_keys(['y']) dict_keys(['y'])
(11, 57728) (11, 58112) (11, 58757) (11, 57697)


In [6]:
print((data_sub1_train1[10]==0).sum(), (data_sub1_train1[10]==1).sum(), (data_sub1_train1[10]==2).sum(), (data_sub1_train1[10]==3).sum(), (data_sub1_train1[10]==4).sum())
print((data_sub1_train2[10]==0).sum(), (data_sub1_train2[10]==1).sum(), (data_sub1_train2[10]==2).sum(), (data_sub1_train2[10]==3).sum(), (data_sub1_train2[10]==4).sum())
print((data_sub2_train1[10]==0).sum(), (data_sub2_train1[10]==1).sum(), (data_sub2_train1[10]==2).sum(), (data_sub2_train1[10]==3).sum(), (data_sub2_train1[10]==4).sum())
print((data_sub2_train2[10]==0).sum(), (data_sub2_train2[10]==1).sum(), (data_sub2_train2[10]==2).sum(), (data_sub2_train2[10]==3).sum(), (data_sub2_train2[10]==4).sum())

51072 0 0 6656 0
50832 156 0 6344 780
32289 5408 4212 1768 15080
29097 3328 2912 2548 19812


In [7]:
def get_trigger_intervals(data):
    intervals_start= []
    intervals_end= []

    for i in range(0,len(data[9])-1):
        if data[9,i]==0 and data[9,i+1]==1:
            intervals_start.append(i+1)
        if data[9,i]==1 and data[9,i+1]==0:
            intervals_end.append(i+1)
    assert len(intervals_start)== len(intervals_end)

    import numpy as np
    intervals = np.array([intervals_start, intervals_end]).T

    return intervals

def get_label_array(data):
    intervals = get_trigger_intervals(data) 
    labels = np.zeros((data.shape[1]))
    class_=1
    for interval in intervals:
        labels[interval[0]:interval[1]]= class_
        class_+=1
        if class_==5:class_=1
    return labels

def get_segment(data_channel, intervals):
    output = []
    if len(data_channel.shape)==1:
        for interval in intervals:
            output.append(data_channel[interval[0]: interval[1]])
    
    elif len(data_channel.shape)==2:
        for interval in intervals:
            output.append(data_channel[:,interval[0]: interval[1]])
    return np.array(output)

def create_dataset(sample_eeg, sample_labels, window_size=10):
    eegs=[]
    labels=[]
    for idx in range(len(sample_eeg)):
        eeg_chunk = sample_eeg[idx]
        label_chunk= sample_labels[idx]
        for i in range(0, eeg_chunk.shape[1]- window_size, window_size):
            eegs.append(eeg_chunk[:,i:i+window_size])
            labels.append(label_chunk[i])
            for k in range(len(label_chunk)-1):
                assert label_chunk[k]==label_chunk[k+1]
    return np.array(eegs), np.array(labels)


In [8]:
name1= "data_sub1_train1"
data1 = data_sub1_train1

name2= "data_sub1_train2"
data2 = data_sub1_train2

In [9]:
def filter_eeg(eeg_signal):
    b1, a1 = butter(N=6, Wn=[0.5, 30], btype='bandpass', fs=256)
    b2, a2 = iirnotch(w0= 50, Q=30, fs=256)
    
    filtered_eeg = filtfilt(b1, a1, eeg_signal)
    filtered_eeg2 = filtfilt(b2, a2, filtered_eeg)
    return filtered_eeg2

In [10]:
def prep_data(data):
    intervals = get_trigger_intervals(data)
    labels = get_label_array(data)

    samples_labels= get_segment(labels, intervals)
    samples_lda_preds= get_segment(data[10], intervals)
    samples_eeg= get_segment(data[1:9], intervals)

    eegs,labels = create_dataset(samples_eeg, samples_labels, window_size=768) # window_size -> timestamps
    eegs.shape, eegs[0].shape, labels.shape, labels[0]
    
    filtered_eegs= []
    for eeg in eegs:
        filtered_eegs.append(filter_eeg(eeg))
    filtered_eegs= np.array(filtered_eegs)
    
    return filtered_eegs,labels

In [11]:
filtered_eegs1,labels1 = prep_data(data1)
filtered_eegs2,labels2 = prep_data(data2)

filtered_eegs3 = np.append(filtered_eegs1,filtered_eegs2, axis=0)
labels3 = np.append(labels1, labels2, axis=0)

print(filtered_eegs1.shape,labels1.shape)
print(filtered_eegs2.shape,labels2.shape)
print(filtered_eegs3.shape,labels3.shape)

(40, 8, 768) (40,)
(40, 8, 768) (40,)
(80, 8, 768) (80,)


# Feature Extraction

In [17]:
# for tsfresh
from tsfresh import extract_relevant_features
from tsfresh import extract_features, select_features

In [18]:
# dfx = pd.DataFrame(columns = ['eeg1','eeg2','eeg3','eeg4','eeg5','eeg6','eeg7','eeg8','id'])
# for i in range(filtered_eegs.shape[0]):
#     if i == 0:
#         temp = filtered_eegs[i].T
#         ids = np.pad(np.array([i]),(0,filtered_eegs[i].shape[1]-1),'constant',constant_values = i)
#         print(temp.shape, ids.shape)
#         continue
    
#     temp = np.append(temp, filtered_eegs[i].T, axis=0)
#     ids = np.append(ids,np.pad(np.array([i]),(0,filtered_eegs[i].shape[1]-1),'constant',constant_values = i))
#     print(temp.shape, ids.shape)
# dfx_arr = np.append(temp,ids.reshape((ids.shape[0],1)), axis=1)
# dfx_arr.shape

In [19]:
def ready_for_tsf(eegs, labels):
    # generatig X dataframe
    dfx = pd.DataFrame(columns = ['eeg1','eeg2','eeg3','eeg4','eeg5','eeg6','eeg7','eeg8','id'])
    for i in range(eegs.shape[0]):
        if i == 0:
            temp = eegs[i].T
            ids = np.pad(np.array([i]),(0,eegs[i].shape[1]-1),'constant',constant_values = i)
            print(temp.shape, ids.shape)
            continue
    
        temp = np.append(temp, eegs[i].T, axis=0)
        ids = np.append(ids,np.pad(np.array([i]),(0,eegs[i].shape[1]-1),'constant',constant_values = i))
#     print(temp.shape, ids.shape)
    for j in range(8):
        dfx['eeg'+str(j+1)] = temp[:,j]
    print(dfx)
    dfx['id'] = ids

    # generate a unique id for each movement based on gesture and iteration values
    dfy = pd.DataFrame()
    dfy['id']  = ids
    # dropping ALL duplicte values
    dfy = dfy.drop_duplicates()
    dfy['y'] 	= labels

    # save files
#     print(dfy)
#     save_file(df, configs(save_folder,'file_tsf_input_x'))
#     save_file(dfy, configs(save_folder,'file_tsf_input_y'))
    return dfx,dfy

In [20]:
def run_tsfresh(eegs,labels):

    dfx,dfy = ready_for_tsf(eegs, labels)
    y =  pd.Series(data=dfy["y"].values, index=dfy["id"].values)
    
    #   # Removing mean and scaling - if the following section is needed uncomment it
#     grouped = dfx.groupby(dfx.id)
#     ids = dfx['id']
#     ids = list(set(ids))
#     ids.sort()
#     for i in ids:
#         if i == ids[0]:
#             X = scaled_gesture(grouped,i)
#             continue

#         scaled_g = scaled_gesture(grouped,i)
#         X = X.append(scaled_g, ignore_index=True)


#     print(X.head())
#     print(y.head())

    fc_parameters = {
    "spkt_welch_density":  [{"coeff": 9}, {"coeff": 10}, {"coeff": 12}, {"coeff": 15}]
    }

    warnings.simplefilter("ignore")
#     features_filtered = extract_relevant_features(dfx, y, column_id='id',n_jobs=0)
    features_filtered = extract_features(dfx, default_fc_parameters=fc_parameters, column_id='id')
    print(features_filtered.head())
    print('TSF finished')
    return features_filtered

## Test to model

In [21]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import StratifiedKFold
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

from tsfresh import extract_features
from tsfresh.utilities.dataframe_functions import make_forecasting_frame


In [22]:
import pandas as pd
import numpy as np
from tsfresh import extract_features
from tsfresh.utilities.dataframe_functions import impute
from tqdm import tqdm

def run_tsfresh(eegs, labels):
    # eegs: list of shape (n_samples, n_channels, n_times)
    df_list = []
    
    for i in tqdm(range(len(eegs)), desc="Feature Extraction"):
        eeg = eegs[i]
        n_channels, n_times = eeg.shape
        for ch in range(n_channels):
            for t in range(n_times):
                df_list.append({
                    "id": i,
                    "time": t,
                    "value": eeg[ch, t],
                    "kind": f"ch{ch}"
                })

    df_all = pd.DataFrame(df_list)
    
    features = extract_features(
        df_all,
        column_id="id",
        column_sort="time",
        column_kind="kind",
        column_value="value",
        n_jobs=0,  # set to 0 to disable multiprocessing (simpler debugging)
    )

    impute(features)
    return features


In [23]:
def ready_for_sklearn(eegs, labels, name):
    dfx = run_tsfresh(eegs, labels)
    dfx.columns = dfx.columns.str.replace('"', '#')
    y = ['L' + str(int(l)) for l in labels]
    dfx['y'] = y
    dfx.to_csv(f'ssvep/{name}_input.csv', index=False)


In [24]:
def select_features(name, k=30):
    df = pd.read_csv(f'ssvep/{name}_input.csv')
    X = df.drop(columns=['y'])
    y = df['y']
    
    selector = SelectKBest(score_func=mutual_info_classif, k=k)
    X_new = selector.fit_transform(X, y)
    selected_columns = X.columns[selector.get_support()]
    
    X_selected = pd.DataFrame(X_new, columns=selected_columns)
    X_selected['y'] = y
    X_selected.to_csv(f'ssvep/{name}_selected.csv', index=False)
    
    print(f'Selected top {k} features: {list(selected_columns)}')
    print('Filtered dataset shape:', X_selected.shape)


In [25]:
def run_model(X, y, clf, model_name):
    skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
    scores = cross_val_score(clf, X, y, cv=skf, scoring='accuracy')
    print(f"{model_name}: Accuracy = {scores.mean():.4f} ± {scores.std():.4f}")
    return scores


In [28]:
import os

# Create the folder if it doesn't exist
os.makedirs('kaggle/working/ssvep', exist_ok=True)

def classify_models(name, use_selected=False):
    if use_selected:
        df = pd.read_csv(f'ssvep/{name}_selected.csv')
    else:
        df = pd.read_csv(f'ssvep/{name}_input.csv')
    
    X = df.drop(columns=['y'])
    y = df['y']
    
    print(f'Using {"selected" if use_selected else "all"} features - shape: {X.shape}')
    
    run_model(X, y, SVC(kernel='linear'), 'SVM')
    run_model(X, y, GaussianNB(), 'Naive Bayes')
    run_model(X, y, KNeighborsClassifier(n_neighbors=3), 'KNN')
    run_model(X, y, LogisticRegression(max_iter=1000), 'Logistic Regression')
    run_model(X, y, RandomForestClassifier(n_estimators=100), 'Random Forest')


In [None]:
# Example usage
name1 = 'sample_subject'

# filtered_eegs1 and labels1 should be defined previously
ready_for_sklearn(filtered_eegs1, labels1, name1)
select_features(name1, k=30)
classify_models(name1, use_selected=False)
classify_models(name1, use_selected=True)


Feature Extraction: 100%|██████████| 40/40 [00:00<00:00, 349.93it/s]
Feature Extraction:  81%|████████  | 259/320 [04:42<01:06,  1.09s/it]

# Classification

# LDA

In [None]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV

In [None]:
# train = s1 run1
# test = s2 run2
X_train = run_tsfresh(filtered_eegs1,labels1)
y_train = labels1
X_test = run_tsfresh(filtered_eegs2,labels2)
y_test = labels2
ocolumns = X_train.columns

In [None]:
X = run_tsfresh(filtered_eegs1,labels1)
ocolumns = X.columns
y = labels1

In [None]:
# create the lda model
model = LinearDiscriminantAnalysis()

# define model evaluation method
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(model, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
# summarize result
print('Mean Accuracy: %.3f (%.3f)' % (np.mean(scores), np.std(scores)))

In [None]:
# define model evaluation method
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# define grid
grid = dict()
grid['solver'] = ['svd', 'lsqr', 'eigen']
# define search
search = GridSearchCV(model, grid, scoring='accuracy', cv=cv, n_jobs=-1)
# perform the search
results = search.fit(X, y)
# summarize

print('Mean Accuracy: %.3f' % results.best_score_)
print('Config: %s' % results.best_params_)

# SVM

In [None]:
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict
from sklearn.decomposition import PCA                 # for dimensionality reduction using PCA
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.metrics import classification_report,confusion_matrix
import matplotlib.pyplot as plt 
import seaborn as sns            # visualization tool
import matplotlib.cm as cm       # for colour mapping to use for the pca plots

In [None]:
scaler = preprocessing.MinMaxScaler()   # since the data set is not gaussian
scaled_df = scaler.fit_transform(X_train)
X_train = pd.DataFrame(scaled_df,columns = ocolumns)
X_test = pd.DataFrame(scaler.transform(X_test), columns = ocolumns)


In [None]:
svclassifier = SVC(kernel = 'linear',C = 1, gamma = 1)
# C = 1.0, gamma = 1.0 for linear kernel - selected using GridSearchCV - for dataset 40
# C = 1, gamma = 1 for rbf kernel - selected using GridSearchCV - for dataset 40
# C = 0.01, gamma = 10 for rbf kernel - selected using GridSearchCV - for dataset 80
# C = 10, gamma = 1 for linear kernel - selected using GridSearchCV - for dataset 80

svclassifier.fit(X_train,y_train)

In [None]:
y_pred = svclassifier.predict(X_test)
#result = [y_pred[i] == y_test[i] for i in range(len(y_test))]
f,ax = plt.subplots(figsize=(10, 10))
sns.heatmap(confusion_matrix(y_test,y_pred), annot=True, linewidths=.1, fmt= '.0f',ax=ax).set_ylim(4,0)
plt.xlabel("Predicted Class")
plt.ylabel("Actual Class")
plt.show()
print(list(y_test),list(y_pred))
print(classification_report(y_test,y_pred))

In [None]:
param_grid = {'C': [0.01,0.1, 1, 10, 100,1000], 'gamma': [1, 0.1, 0.01, 0.001, 0.00001, 10,100]}
clf_grid = GridSearchCV(SVC(kernel = 'linear'), param_grid)
clf_grid.fit(X_train, y_train)
print("Best Parameters:\n", clf_grid.best_params_)

In [None]:
scaler = preprocessing.MinMaxScaler()   # since the data set is not gaussian
scaled_df = scaler.fit_transform(X)
X = pd.DataFrame(scaled_df,columns = ocolumns)
X

In [None]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.20, stratify = y)

svclassifier = SVC(kernel = 'linear',C = 1, gamma = 1)
# C = 1.0, gamma = 1.0 for linear kernel - selected using GridSearchCV - for dataset 40
# C = 1, gamma = 1 for rbf kernel - selected using GridSearchCV - for dataset 40
# C = 0.01, gamma = 10 for rbf kernel - selected using GridSearchCV - for dataset 80
# C = 10, gamma = 1 for linear kernel - selected using GridSearchCV - for dataset 80

svclassifier.fit(X_train,y_train)

In [None]:
y_pred = svclassifier.predict(X_test)
#result = [y_pred[i] == y_test[i] for i in range(len(y_test))]
f,ax = plt.subplots(figsize=(10, 10))
sns.heatmap(confusion_matrix(y_test,y_pred), annot=True, linewidths=.1, fmt= '.0f',ax=ax).set_ylim(4,0)
plt.xlabel("Predicted Class")
plt.ylabel("Actual Class")
plt.show()
print(list(y_test),list(y_pred))
print(classification_report(y_test,y_pred))

In [None]:
param_grid = {'C': [0.01,0.1, 1, 10, 100,1000], 'gamma': [1, 0.1, 0.01, 0.001, 0.00001, 10,100]}
clf_grid = GridSearchCV(SVC(kernel = 'linear'), param_grid)
clf_grid.fit(X, y)
print("Best Parameters:\n", clf_grid.best_params_)

In [None]:
################  ALL_MODELS ######################

import sklearn
from sklearn.ensemble import *
from sklearn.neighbors import *
from sklearn.linear_model import *
from sklearn.svm import *
from sklearn.tree import *


# common models
model1=KNeighborsClassifier()
model2=RadiusNeighborsClassifier(radius=10)
model3=PassiveAggressiveClassifier()
model4=RidgeClassifier()
model5=RidgeClassifierCV()
model6=SGDClassifier()
model7=LinearSVC()
model8=NuSVC()
model9=SVC()
model10=DecisionTreeClassifier()
model11=ExtraTreeClassifier()

#models_from_ensemble
model12=AdaBoostClassifier()
model13=BaggingClassifier()
model14=ExtraTreesClassifier(n_estimators=1000)
model15=GradientBoostingClassifier()
model16=RandomForestClassifier()

# Using multiple models

estimators=[('model2',model2),('model12',model12),('model14',model14) ]

model17=StackingClassifier(estimators = estimators)
model18=VotingClassifier(estimators=estimators)

####################### all_models ############################
models=[model1,model2,model3,model4,model5,model6,model7,model8,model9,model10,model11,
        model12,model13,model14,model15,model16,model17,model18]

In [None]:
# TRAINING ALL THE MODELS

from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

i=0
importances=[]
for model in models:
    i+=1
    try:
        model.fit(X_train,y_train)
    except Exception as e: print('Error in model.fit >>>',e)
    try:
        pred=model.predict(X_test)
    except Exception as e: print('Error in model.predict >>>',e)
    print(str(i)+'___'+str(model.__class__)+(100-len(str(model.__class__)))*'>'+'__',accuracy_score(y_test,pred))
    try:
        importances.append([i-1,model.feature_importances_])
    except AttributeError:
        pass

In [None]:
X_train = run_tsfresh(filtered_eegs1,labels1)
y_train = labels1
X_test = run_tsfresh(filtered_eegs2,labels2)
y_test = labels2
ocolumns = X_train.columns

scaler = preprocessing.MinMaxScaler()   # since the data set is not gaussian
scaled_df = scaler.fit_transform(X_train)
X_train = pd.DataFrame(scaled_df,columns = ocolumns)
X_test = pd.DataFrame(scaler.transform(X_test), columns = ocolumns)




In [None]:
# TRAINING ALL THE MODELS

from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

i=0
importances=[]
for model in models:
    i+=1
    try:
        model.fit(X_train,y_train)
    except Exception as e: print('Error in model.fit >>>',e)
    try:
        pred=model.predict(X_test)
    except Exception as e: print('Error in model.predict >>>',e)
    print(str(i)+'___'+str(model.__class__)+(100-len(str(model.__class__)))*'>'+'__',accuracy_score(y_test,pred))
    try:
        importances.append([i-1,model.feature_importances_])
    except AttributeError:
        pass

In [None]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.20, stratify = y)

svclassifier = SVC(gamma = 'auto')
# C = 1.0, gamma = 1.0 for linear kernel - selected using GridSearchCV
# C = 0.01, gamma = 100 for rbf kernel - selected using GridSearchCV

svclassifier.fit(X_train,y_train)

y_pred = svclassifier.predict(X_test)
#result = [y_pred[i] == y_test[i] for i in range(len(y_test))]
f,ax = plt.subplots(figsize=(10, 10))
sns.heatmap(confusion_matrix(y_test,y_pred), annot=True, linewidths=.1, fmt= '.0f',ax=ax).set_ylim(4,0)
plt.xlabel("Predicted Class")
plt.ylabel("Actual Class")
plt.show()
print(list(y_test),list(y_pred))
print(classification_report(y_test,y_pred))

In [None]:
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.20, stratify = y)

svclassifier = SVC(kernel = 'rbf',C = 1, gamma = 1)
# C = 1.0, gamma = 1.0 for linear kernel - selected using GridSearchCV
# C = 0.01, gamma = 100 for rbf kernel - selected using GridSearchCV

svclassifier.fit(X_train,y_train)

y_pred = svclassifier.predict(X_test)
#result = [y_pred[i] == y_test[i] for i in range(len(y_test))]
f,ax = plt.subplots(figsize=(10, 10))
sns.heatmap(confusion_matrix(y_test,y_pred), annot=True, linewidths=.1, fmt= '.0f',ax=ax).set_ylim(4,0)
plt.xlabel("Predicted Class")
plt.ylabel("Actual Class")
plt.show()
print(list(y_test),list(y_pred))
print(classification_report(y_test,y_pred))