# All try code

In [31]:
import mne
import numpy as np
from mne.datasets import eegbci
import matplotlib.pyplot as plt
from os import listdir
from mne.channels import make_standard_montage
from scipy import signal
from scipy.linalg import sqrtm, inv 

def GetRawEDF(target_subjects= "all", condition="offline"):

    EEG_data = {}

    if condition == "offline":
        condition = "Offline_Experiment"
    elif condition == "online":
        condition = "Online_Experiment"

    if target_subjects == "all":
        target_subjects = ["pipo","NutF8","AJpang","Aoomim","voen"]

    for i in range (0,len(target_subjects)):

        path = "C:\\git\Senior_Thesis\\DataSet\\"+condition+"\\"+ target_subjects[i] +"\\notch_EDF\\"
        list_dir = listdir(path)
        raw_each = [0] * len(list_dir)
        for j in range(len(list_dir)):
            raw_each[j] = mne.io.read_raw_edf(path+list_dir[j],preload = False)
            
        raw_edf = mne.concatenate_raws(raw_each)

        eegbci.standardize(raw_edf)  # set channel names
        montage = make_standard_montage("standard_1005")
        raw_edf.set_montage(montage)

        EEG_data[target_subjects[i]] = {"Raw_data": raw_edf.copy()}

    print(f"Successful to create Data of {target_subjects}")

    return EEG_data

EEG_data = GetRawEDF(target_subjects = "all")

Extracting EDF parameters from C:\git\Senior_Thesis\DataSet\Offline_Experiment\pipo\notch_EDF\sess1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Extracting EDF parameters from C:\git\Senior_Thesis\DataSet\Offline_Experiment\pipo\notch_EDF\sess2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Extracting EDF parameters from C:\git\Senior_Thesis\DataSet\Offline_Experiment\pipo\notch_EDF\sess3.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Extracting EDF parameters from C:\git\Senior_Thesis\DataSet\Offline_Experiment\NutF8\notch_EDF\sess1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Extracting EDF parameters from C:\git\Senior_Thesis\DataSet\Offline_Experiment\NutF8\notch_EDF\sess2.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Extracting EDF parameters from C:\git\Senior_The

In [22]:
target_sub = ["pipo","nutF8","AJpang","Aoomim"]

a = [0,0,0,0]

EEG_data = {}

for i in target_sub:
    EEG_data[i] = {"Raw_data": a.copy()}

nor = [5,5,5]

for key in EEG_data:
    print(key)
    EEG_data[key]["EA_data"] = nor


# for sub_key in EEG_data:
#     for data_key in EEG_data[sub_key]:
#         print(sub_key,data_key)
    


pipo
nutF8
AJpang
Aoomim


In [19]:
EEG_data

{'pipo': {'data': [0, 0, 0, 0], 'EA_data': [5, 5, 5]},
 'nutF8': {'data': [0, 0, 0, 0], 'EA_data': [5, 5, 5]},
 'AJpang': {'data': [0, 0, 0, 0], 'EA_data': [5, 5, 5]},
 'Aoomim': {'data': [0, 0, 0, 0], 'EA_data': [5, 5, 5]}}

# Try CSP with multiple subjects

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from mne.decoding import CSP
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import ShuffleSplit,StratifiedKFold ,cross_val_score, cross_val_predict, KFold

def computeCSPFeatures(data, target_subjects,condition):

    all_data = None
    label = None

    if condition == "noEA":
        query = "Raw_Epoch"
    else:
        query = "EA_Epoch"

    for sub in target_subjects:
        if all_data is None:
            all_data = data[sub]['Raw_Epoch']
        else:
            all_data = np.concatenate((all_data, data[sub][query]), axis=0)

        if label is None:
            label = data[sub]['label']
        else:
            label = np.concatenate((label, data[sub]['label']), axis=0)


    print(np.array(all_data).shape, np.array(label).shape)
    
    csp = CSP(n_components=8, reg=None, log=None, rank= 'info')

    train_data_csp, label = shuffle(all_data, label, random_state = 0)
    X_train, X_test, y_train, y_test = train_test_split(train_data_csp, label, test_size= 0.3, random_state = 0, stratify=label)
    csp.fit(X_train, y_train)

    X_train = csp.transform(X_train)
    X_test  = csp.transform(X_test)


    lda = LinearDiscriminantAnalysis()
    score = cross_val_score(lda, X_train, y_train, cv= 10)
    print("LDA only Cross-validation scores:", np.mean(score))
    lda.fit(X_train, y_train)
        
    from sklearn.metrics import classification_report,confusion_matrix

    y_pred = lda.predict(X_train)

    print("Classification TRAIN DATA \n=======================")
    print(classification_report(y_true=y_train, y_pred=y_pred))
    print("Confusion matrix \n=======================")
    print(confusion_matrix(y_true=y_train, y_pred=y_pred))

    y_pred = lda.predict(X_test)

    print("Classification TEST DATA \n=======================")
    print(classification_report(y_true=y_test, y_pred=y_pred))
    print("Confusion matrix \n=======================")
    print(confusion_matrix(y_true=y_test, y_pred=y_pred))

computeCSPFeatures(EEG_Epochs, target_subjects = ["pipo","voen","AJpang"] ,condition = "noEA")

# Backup

In [None]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin

class LogReg_TL(BaseEstimator):

    def __init__(self, learningRate=1e-5, num_iter=100, penalty=None, intercept = True,\
                 lambd=1, Sigma_TL=np.array([[0, 0],[0, 0]]), mu=0):
        
        self.learningRate = learningRate
        self.num_iter = num_iter
        self.penalty = penalty
        self.intercept = intercept
        self.Sigma_TL = Sigma_TL
        self.lambd = lambd
        self.mu = mu

    def __softmax(self,z): #Change from sigmoid to softmax for multi-classification
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def __reg_logLL1(self,z, y, weights): # cal sum of negative log-likelihood (2)
        reg = self.lambd * np.linalg.norm(weights)**2
        return (-1 * np.sum((y * np.log10(self.__softmax(z))) + ((1 - y) * np.log10(1 - self.__softmax(z)))) ) + reg
    
    def __reg_logLL2(self, z, y, weights): # cal L2 weight (target_subjects) (4)
        Sigma_TL_det = np.log10(np.linalg.det(self.Sigma_TL))
        reg = 0.5 * self.lambd * np.sum( ((weights-self.mu)**2)@self.Sigma_TL) + Sigma_TL_det 

        return (-1 * np.sum((y * np.log10(self.__softmax(z))) + ((1 - y) * np.log10(1 - self.__softmax(z)))) ) + reg

    def fit(self, X_train, y_train):
        self.costs = []

        n_classes = np.unique(y_train).size

        if self.intercept:
            X_train = np.c_[np.ones([np.shape(X_train)[0], 1]), X_train]

        self.weights = np.zeros((np.shape(X_train)[1], n_classes))

        y_train_onehot = np.eye(n_classes)[y_train]

        for i in range(self.num_iter):
            z = np.dot(X_train, self.weights)
            err = self.__softmax(z) - y_train_onehot
            # print(self.__softmax(z))

            if self.penalty == 'L1':
                
                # weight update
                delta_w = np.dot(X_train.T, err)
                self.weights += -self.learningRate * delta_w
                self.weights[1:] += -self.learningRate *self.lambd * self.weights[1:]
                
                # costs
                self.costs.append(self.__reg_logLL1(z, y_train_onehot, self.weights))

            elif self.penalty == 'L2':

                # weight update
                delta_w = np.dot(X_train.T, err)
                self.weights += -self.learningRate * delta_w
                self.weights[1:] += -self.learningRate *self.lambd * ((self.weights - self.mu)@(np.linalg.inv(self.Sigma_TL)))[1:]

                self.costs.append(self.__reg_logLL2(z, y_train_onehot, self.weights))
                

        return self

def build_clf_params(data, target_subjects ,condition):

    for sub in data.keys():

        if sub  == target_subjects: #Don't apply weight to target subject
            pass 

        else:
            # Where the tranining data is stored
            if condition == "noEA":
                X = data[sub]['Raw_csp']
                y = data[sub]['Raw_csp_label']
                store_ws = 'ws_Raw'

            else:
                X = data[sub]['EA_csp']
                y = data[sub]['EA_csp_label']
                store_ws = 'ws_EA'
            
            # Use this model when training subject as source 
            model_L1 = LogReg_TL(learningRate=0.001, num_iter=30000, penalty='L1', lambd=1)
            
            # Fit model and store weight
            model_L1.fit(X, y)
            print("weights of ", str(sub), ": ", model_L1.weights)
            print("costs of ", str(sub), ": ", model_L1.costs[len(model_L1.costs)-1])
            data[sub][store_ws] = model_L1.weights

# X_train = np.random.rand(120, 5)
# y_train = np.random.choice([0, 1, 2, 3], size=120)

build_clf_params(CSP2D_Epoch, target_subjects= target_data ,condition = "EA")

In [None]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin

class LogReg_TL(BaseEstimator):

    def __init__(self, learningRate=1e-5, num_iter=100, penalty=None, intercept = True,\
                 lambd=1, Sigma_TL=np.array([[0, 0],[0, 0]]), mu=0):
        
        self.learningRate = learningRate
        self.num_iter = num_iter
        self.penalty = penalty
        self.intercept = intercept
        self.Sigma_TL = Sigma_TL
        self.lambd = lambd
        self.mu = mu

    def softmax(self,z): #Change from sigmoid to softmax for multi-classification
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def reg_logLL1(self, ws, X, y): # cal sum of negative log-likelihood (2)
        # Compute predictions
        logits = np.dot(X, ws)
        predictions = self.softmax(logits)
        m = X.shape[0]

        # Compute cross-entropy loss
        error = -np.sum(y * np.log(predictions))
        cost = error / m + (self.lambd / 2) * np.sum(ws**2)  # Regularization term

        # Compute gradient
        gradient = np.dot(X.T, (predictions - y)) / m + self.lambd * ws
        return cost, gradient

    def fit(self, X_train, y_train):
        self.costs = 0.0
        n_classes = np.unique(y_train).size
        self.weights = np.zeros((np.shape(X_train)[1], n_classes))
        y_train_onehot = np.eye(n_classes)[y_train]

        for i in range(self.num_iter):

            if self.penalty == 'L1':
                
                self.costs, gradient = self.reg_logLL1(self.weights, X_train, y_train_onehot)
                self.weights -= self.learningRate * gradient
                
        return self

def build_clf_params(data, target_subjects ,condition):

    for sub in data.keys():

        if sub  == target_subjects: #Don't apply weight to target subject
            pass 

        else:
            # Where the tranining data is stored
            if condition == "noEA":
                X = data[sub]['Raw_csp']
                y = data[sub]['Raw_csp_label']
                store_ws = 'ws_Raw'

            else:
                X = data[sub]['EA_csp']
                y = data[sub]['EA_csp_label']
                store_ws = 'ws_EA'
            
            # Use this model when training subject as source 
            model_L1 = LogReg_TL(learningRate=0.001, num_iter=30000, penalty='L1', lambd=1)
            
            # Fit model and store weight
            model_L1.fit(X, y)
            print("weights of ", str(sub), ": ", model_L1.weights)
            print("costs of ", str(sub), ": ", model_L1.costs)
            data[sub][store_ws] = model_L1.weights

build_clf_params(CSP2D_Epoch, target_subjects= target_data ,condition = "EA")

# TRY

In [9]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, GRU, Dense, GlobalAveragePooling1D, TimeDistributed, Dropout

# Define the number of output classes
n_classes = 4  # Assuming binary classification

# Define the model using Sequential
model = Sequential([
    # 1D Convolution Layer
    Conv1D(32, kernel_size=3, strides=1, activation='relu', input_shape=(513, 5)),
    
    # Max Pooling Layer
    MaxPooling1D(pool_size=3, strides=2),
    Dropout(0.5),
    
    # Second 1D Convolution Layer
    Conv1D(64, kernel_size=3, strides=1, activation='relu'),
    
    # Second Max Pooling Layer
    MaxPooling1D(pool_size=3, strides=2),
    Dropout(0.5),
    
    # GRU Layer
    GRU(5, return_sequences=True),
    
    # Time Distributed Dense Layer
    TimeDistributed(Dense(5, activation='relu')),
    
    # Global Average Pooling Layer
    GlobalAveragePooling1D(),
    
    # Dense Layer for output (adjusted for binary classification)
    Dense(n_classes, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Model summary
model.summary()


Model: "sequential_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_14 (Conv1D)          (None, 511, 32)           512       
                                                                 
 max_pooling1d_14 (MaxPooli  (None, 255, 32)           0         
 ng1D)                                                           
                                                                 
 dropout_14 (Dropout)        (None, 255, 32)           0         
                                                                 
 conv1d_15 (Conv1D)          (None, 253, 64)           6208      
                                                                 
 max_pooling1d_15 (MaxPooli  (None, 126, 64)           0         
 ng1D)                                                           
                                                                 
 dropout_15 (Dropout)        (None, 126, 64)          