In [76]:
import os
import sys
import re
import numpy as np
import pandas as pd
import scipy.io as sio
import torch
import matplotlib.pyplot as plt
from scipy.stats import kurtosis, skew
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.neural_network import MLPClassifier

In [77]:
folder_path = {"Long_words": "/home/tseringj/final_project/Long_Words",
               "Short_Long_words": "/home/tseringj/final_project/Short_Long_words",
               "Short_words": "/home/tseringj/final_project/Short_words",
               "Vowels": "/home/tseringj/final_project/Vowels"}

words_dict = {
    "Long_words": ["cooperate", "independent"],
    "Short_Long_words": ["cooperate", "in"],
    "Short_words": ["out", "in", "up"],
    "Vowels": ["a", "i", "u"]
}

numeric_labels = {
    "Long_words": {"cooperate": 0, "independent": 1},
    "Short_Long_words": {"cooperate": 0, "in": 1},
    "Short_words": {"out": 0, "in": 1, "up": 2},
    "Vowels": {"a": 0, "i": 1, "u": 2}
}

In [78]:

matrix_to_load = "eeg_data_wrt_task_rep_no_eog_256Hz_last_beep"

def load_EEG(type, subject_no):
    path = folder_path[type]
    words = words_dict[type]
    for subject_file in os.scandir(path):
        if not (subject_file.is_file() and subject_file.name.endswith('.mat') and
                int(re.search("[0-9]+", subject_file.name).group(0)) == subject_no):
            continue
        mat = sio.loadmat(subject_file.path)[matrix_to_load]
        
        temp = f"{path}/temp_files3"
        if not os.path.exists(temp):
            os.mkdir(temp)
        temp = f"{temp}/{subject_no}"

        if not os.path.exists(temp):
            os.mkdir(temp)
        X = []
        Y = []
        for index, eeg in np.ndenumerate(mat):
            temp2 = f"{temp}/{words[index[0]]}_{index[1] + 1}.npy" #storing each trial
            X.append(temp2)
            Y.append(words[index[0]])
            if not os.path.exists(temp2):
                np.save(temp2, eeg)
    return np.array(X), np.array(Y)

In [79]:
def compute_time_domain_features(eeg_data):
    time_features = {}

    # Mean
    time_features['mean'] = np.mean(eeg_data)

    # Standard deviation
    time_features['std'] = np.std(eeg_data)
    
    # Kurtosis
    time_features['kurtosis'] = kurtosis(eeg_data)
    '''

    # Energy
    time_features['Energy'] = np.sum(np.square(eeg_data))

    # RMS (Root Mean Square)
    time_features['RMS'] = np.sqrt(np.mean(np.square(eeg_data)))

    # Zero-crossing rate
    zero_crossings = np.where(np.diff(np.sign(eeg_data)))[0]
    zero_crossing_rate = len(zero_crossings) / (len(eeg_data) - 1)
    time_features['Zero-crossing rate'] = zero_crossing_rate

    # Hjorth parameters
    # Hjorth parameters
    time_features['hjorth_activity'] = np.sqrt(time_features['Energy'] / time_features['std']**2)
    time_features['hjorth_mobility'] = time_features['mean'] / time_features['std']
    time_features['hjorth_complexity'] = time_features['kurtosis'] / (time_features['std']**4)

    # Skewness
    time_features['Skewness'] = skew(eeg_data)

    # Median
    time_features['Median'] = np.median(eeg_data)


    # Range
    time_features['Range'] = np.ptp(eeg_data)

    # Inter-quartile range
    time_features['Inter-quartile range'] = np.percentile(eeg_data, 75) - np.percentile(eeg_data, 25)

    # Variance
    time_features['Variance'] = np.var(eeg_data)
    '''

    # # Autocorrelation
    # autocorr = np.correlate(eeg_data, eeg_data, mode='full')
    # time_features['Autocorrelation'] = autocorr[len(autocorr)//2:]


    return time_features

In [80]:
def extract_features(eeg_data):
  channels_to_select = [i for i in range(64) if i not in [0, 9, 32, 63]]  # Channels to select (excluding 0, 9, 32, and 63)
  data=eeg_data[channels_to_select,:]
  features=[]

  for i in range(60):
    results=compute_time_domain_features(data[i,:])
    features.extend(list(results.values()))

    
  return features

In [81]:
datasub2 = sio.loadmat('Long_Words/sub_2b_ch64_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']
datasub3 = sio.loadmat('Long_Words/sub_3b_ch80_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']
datasub6 = sio.loadmat('Long_Words/sub_6_ch64_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']
datasub7 = sio.loadmat('Long_Words/sub_7_ch64_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']
datasub9 = sio.loadmat('Long_Words/sub_9c_ch64_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']
datasub2 = sio.loadmat('Long_Words/sub_11b_ch64_l_eog_removed_256Hz.mat')['eeg_data_wrt_task_rep_no_eog_256Hz_last_beep']

In [82]:
feature=extract_features(datasub2[0][0])
len(feature)

180

In [83]:
# function for data augmentation

def train_augmentation(X,Y):
    final_data= np.empty((0,181))
    for i in range(len(X)):
        
        with open(X[i], 'rb') as f:
            data = np.load(f)

            result=np.empty((0,180))
        
            
            # Loop through the data with a stride of 64 samples
            for j in range(0, 1280, 64):
            # Select a window of 256 samples from the data, starting at index i
                window = data[:, j:j+256]
                features=extract_features(window)
                
                result=np.vstack((result, (np.array(features)).reshape((1,-1))))
                

                # Stop the loop if i+256 is greater than or equal to 1280
                if j+256 >= 1280:
                    break
            if numeric_labels[type][Y[i]]==0:
              label=np.zeros((17,1))
              result=np.hstack((result, label))
            else:
              label=np.ones((17,1))
              result=np.hstack((result, label))
            
            final_data=np.vstack((final_data, result))
            
    
    
        
    return final_data


In [84]:
# function for data augmentation
'''
def test_augmentation(X,Y):
    final_data= np.empty((0,121))
    for i in range(len(X)):
        
        with open(X[i], 'rb') as f:
            data = np.load(f)

            result=np.empty((0,120))
        
            
            # Loop through the data with a stride of 64 samples
            for j in range(0, 1280, 360):
            # Select a window of 256 samples from the data, starting at index i
                window = data[:, j:j+360]
                features=extract_features(window)
                
                result=np.vstack((result, (np.array(features)).reshape((1,-1))))
                
                

                # Stop the loop if i+256 is greater than or equal to 1280
                if j+360 >= 1280:
                    break
            if numeric_labels[type][Y[i]]==0:
              label=np.zeros((4,1))
              result=np.hstack((result, label))
            else:
              label=np.ones((4,1))
              result=np.hstack((result, label))
            final_data=np.vstack((final_data, result))
            
    
    
        
    return final_data
'''

"\ndef test_augmentation(X,Y):\n    final_data= np.empty((0,121))\n    for i in range(len(X)):\n        \n        with open(X[i], 'rb') as f:\n            data = np.load(f)\n\n            result=np.empty((0,120))\n        \n            \n            # Loop through the data with a stride of 64 samples\n            for j in range(0, 1280, 360):\n            # Select a window of 256 samples from the data, starting at index i\n                window = data[:, j:j+360]\n                features=extract_features(window)\n                \n                result=np.vstack((result, (np.array(features)).reshape((1,-1))))\n                \n                \n\n                # Stop the loop if i+256 is greater than or equal to 1280\n                if j+360 >= 1280:\n                    break\n            if numeric_labels[type][Y[i]]==0:\n              label=np.zeros((4,1))\n              result=np.hstack((result, label))\n            else:\n              label=np.ones((4,1))\n              re

In [85]:
# function for data augmentation

def test_augmentation(X,Y):
    final_data= np.empty((0,181))
    for i in range(len(X)):
        
        with open(X[i], 'rb') as f:
            data = np.load(f)
        
            features=extract_features(data)
                
            features=np.array(features).reshape((1,-1))
                
                


            if numeric_labels[type][Y[i]]==0:
              label=np.zeros((1,1))
              features=np.hstack((features, label))
            else:
              label=np.ones((1,1))
              features=np.hstack((features, label))
            final_data=np.vstack((final_data, features))
            
    
    
        
    return final_data


In [86]:
def calculate_performance(y_test, y_pred):
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    print(f'accuracy: {accuracy}, precision: {precision}, recall: {recall}, f1 {f1}')

In [87]:
def get_data(type,subject_no):
    X,Y=load_EEG(type, subject_no)
        
    # Example usage
    train_X, test_X, train_y, test_y = train_test_split(X, Y, test_size=0.2, stratify= Y,random_state=42)
    train_data = train_augmentation(train_X, train_y)
    test_data = test_augmentation(test_X, test_y)
    X_train, y_train=train_data[:,:-1], train_data[:,-1]
    X_test, y_test=test_data[:,:-1], test_data[:,-1]
    return X_train, X_test, y_train, y_test


In [91]:
def train_model(X_train, X_test, y_train, y_test):
  scaler = StandardScaler()
  train_data = scaler.fit_transform(X_train)
  test_data = scaler.transform(X_test)
  # train_data=X_train
  # test_data=X_test
  n_components =   100# Specify the number of components you want to keep

  pca = PCA(n_components=n_components)
  train_pca = pca.fit_transform(train_data)
  test_pca = pca.transform(test_data)
  print(train_pca.shape, test_pca.shape)
  
  y_train=y_train.astype(int)
  y_test=y_test.astype(int)
  print(sum(y_train), sum(y_test))
  # Import other classifiers as needed

  # Train classifiers with different n_components values
  model=SVC(kernel='rbf')
  model.fit(train_pca, y_train)
  y_pred=model.predict(test_pca)
  calculate_performance(y_test, y_pred)
  
  clf1 = SVC(kernel='linear')
  clf1.fit(train_pca, y_train)
  y_pred_pca = clf1.predict(test_pca)
  print("pca linear performance: ")
  calculate_performance(y_test, y_pred_pca)


  
  clf2 = RandomForestClassifier()
  clf2.fit(train_pca, y_train)
  y_pred_rfc = clf2.predict(test_pca)
  print("Random Forest performance: ")
  calculate_performance(y_test, y_pred_rfc)

  model = MLPClassifier(hidden_layer_sizes=(100,), max_iter=500,activation='relu', solver='adam', random_state=42)
  model.fit(train_pca, y_train)
  y_pred = model.predict(test_pca)
  y_pred_mlp = [round(value) for value in y_pred]
  print('MLP performance: ')
  calculate_performance(y_test, y_pred_mlp)


In [92]:
type="Long_words"
subject_no=2
X_train, X_test, y_train, y_test=get_data(type, subject_no)


In [93]:
train_model(X_train, X_test, y_train, y_test)

(2720, 100) (40, 100)
1360 20
accuracy: 0.55, precision: 0.5714285714285714, recall: 0.4, f1 0.47058823529411764
pca linear performance: 
accuracy: 0.5, precision: 0.5, recall: 0.3, f1 0.37499999999999994
Random Forest performance: 
accuracy: 0.525, precision: 0.5384615384615384, recall: 0.35, f1 0.4242424242424242
MLP performance: 
accuracy: 0.575, precision: 0.5652173913043478, recall: 0.65, f1 0.6046511627906976


(2720, 50) (40, 50)
1360 20
accuracy: 0.55, precision: 0.5625, recall: 0.45, f1 0.5