This file is used for generating Spectral Power Vectors (SPV) from *Raw* features. The *Raw* EEG features are obtained using `matlab_files/eeg_csp_raw_run.m`. Refer Section III C(1) in the paper for feature extraction method. The highlighted part in the following image indicate the pipeline of the current file.

Note: The configurations, like hyperparameter values, present in this file are not the final configurations as reported in the paper. The present configurations are maybe mutated for additional experiments. However, results reported in the paper can be reproduced by plugging in the appropriate configurations as mentioned in the paper. In this code `NC` means Normal Controls (equivalent to `HC` as referred in the paper)

![Generating Spectral Power Vectors from EEG Signals](images/raw_SPV.png)

(Optional) Mount Gdrive on Google Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Imports

In [None]:
from scipy.signal import butter
from scipy.signal import filtfilt
from scipy.signal import iirfilter, lfilter
from scipy.fft import fft
import os
import xlrd
import pandas as pd
import numpy as np
from scipy.io import loadmat, savemat
from math import floor
from tqdm.notebook import tqdm
from sklearn.decomposition import PCA
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.model_selection import train_test_split, cross_val_predict
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report, accuracy_score, make_scorer
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV 
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
import os
import pandas as pd
from math import floor
import numpy as np
from scipy.stats import sem
from numpy import mean
from numpy import std
from sklearn.datasets import make_classification
from matplotlib import pyplot

# Signal Processing

In [None]:
def amplitude_reduction(data, threshold):
  data[data >= threshold] = 0
  data[data <= -threshold] = 0
  #normalized = (data-min(data))/(max(data)-min(data)) # normalize
  return data

# butter band_pass_filter
def butter_bandpass_filter(data, lowcut, highcut, fs, order=7):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    #b, a = iirfilter(N=order, Wn = [low, high], btype='bandpass', ftype='butter', fs=fs)
    out = lfilter(b, a, data)
    #data = amplitude_reduction(data, 85) # amplitude thresholding
    #out = filtfilt(b, a, data)
    return out

# alpha spectral band
def alpha_BP(data):
    return butter_bandpass_filter(data, 8, 13, 128)

# Beta spectral band
def beta_BP(data):
    return butter_bandpass_filter(data, 13, 30, 128)

# Gamma spectral band
def gamma_BP(data):
    return butter_bandpass_filter(data, 30, 49, 128)


# Band power AKA sum of squares of Fast Fourier Transform (FFT) coefficients

def average_intensity(band):
    return sum(np.square(abs(band)))


# Alpha Beta Gamma Calculation
def alpha_beta_gamma_calc(data_eeg,max_channel):
    alphas = []
    betas = []
    gammas = []

    for i in range(0,max_channel,1):
        data = data_eeg[i,:]
        alphas.append(average_intensity(fft(alpha_BP(data))))
        betas.append(average_intensity(fft(beta_BP(data))))
        gammas.append(average_intensity(fft(gamma_BP(data))))

    return np.concatenate((alphas,betas, gammas), axis=None)


# Feature Extraction Function
def feature_eeg_xl(dir, seconds, sampling_rate):
    final_feature_mat = []
    y_pd_nc = []
    y_emotions = []
    for root, dire, filename in tqdm(sorted(os.walk(dir))):
      for files in (filename):
        if files.endswith(".xls") or files.endswith(".xlsx"): 
            #print('Genrating Features for file = ',root, dire, files)
            M_data = pd.read_excel(os.path.join(root, files), header=None)
            eeg_data = np.transpose(M_data.to_numpy())
            epochs = floor(eeg_data.shape[1]/(seconds*sampling_rate))
            for i in range(epochs):
              eeg_split_data = eeg_data[:, (i)*seconds*sampling_rate:(i+1)*seconds*sampling_rate]
              final_feature_mat.append(alpha_beta_gamma_calc(eeg_split_data, 14))
        else:
            continue
    return np.array(final_feature_mat)

# Feature Extraction

In [None]:
dir1 = "/content/drive/MyDrive/PD_NC DATABASE_FINAL"
PD_NC = ['NC', 'PD']
Emotions = ['E1','E2','E3','E4','E5','E6']
#Emotions = ['E1']
seconds = 5
sampling_rate =128
feature_mat = np.empty((0,42))
y_vec_emot = np.array([])
y_vec_pdnc = np.array([])
subject_id = np.array([])

for pdnc in PD_NC:
  for emot in Emotions:
    dir2 = os.path.join(dir1, pdnc, emot)
    emot_mat = feature_eeg_xl(dir2, seconds, sampling_rate)
    y_vec_emot = np.concatenate((y_vec_emot, [emot]*emot_mat.shape[0]), axis = None)
    y_vec_pdnc = np.concatenate((y_vec_pdnc, [pdnc]*emot_mat.shape[0]), axis = None)
    feature_mat = np.concatenate((feature_mat, emot_mat), axis = 0)
    subject_id = np.concatenate((subject_id, emot_mat[1]), axis=None)

In [None]:
print(feature_mat.shape)
print(y_vec_emot.shape)
print(y_vec_pdnc.shape)
print(subject_id.shape)

In [None]:
full_feat = feature_mat

## Full PD-NC Labels

In [None]:
# Full PDNC labels
df_y_pdnc = pd.DataFrame(y_vec_pdnc, dtype='string')
labels_pdnc = {'PD':1, 'NC':0}
df_y_pdnc = df_y_pdnc.replace(labels_pdnc)
full_pdnc_labels = df_y_pdnc[0].to_numpy()
full_pdnc_labels

## Full Multiclass labels

In [None]:
# full multi labels
df_y_multi = pd.DataFrame(y_vec_emot, dtype='string')
labels_multi = {'E1': 1, 'E2':2, 'E3':3, 'E4':4, 'E5':5, 'E6':6}
df_y_multi = df_y_multi.replace(labels_multi)
full_multi_labels = df_y_multi[0].to_numpy()
full_multi_labels

## Full HV-LV Labels

In [None]:
#full hvlv labels
df_y_hvlv = pd.DataFrame(y_vec_emot, dtype='string')
labels_hvlv = {'E1':0, 'E2':1, 'E3':0, 'E4':0, 'E5':1, 'E6':0}
df_y_hvlv = df_y_hvlv.replace(labels_hvlv)
full_hvlv_labels = df_y_hvlv[0].to_numpy()
full_hvlv_labels

## Fl HA-LA Labels

In [None]:
# full hala labels

df_y_hala = pd.DataFrame(y_vec_emot, dtype='string')
labels_hala = {'E1':0, 'E2':1, 'E3':1, 'E4':1, 'E5':1, 'E6':1}
df_y_hala = df_y_hala.replace(labels_hala)
full_hala_labels = df_y_hala[0].to_numpy()
full_hala_labels

## Sad data and Labels

In [None]:
# E1 data and labels

df_E = pd.DataFrame(feature_mat)
df_E[len(df_E.columns)] = df_y_pdnc
df_E[len(df_E.columns)] = df_y_multi

df_E1 = df_E.loc[df_E[43] == 1]

E1_feat = df_E1.iloc[:, :42].to_numpy()
E1_pdnc_labels = df_E1.iloc[:, 42].to_numpy()
E1_subject_id = df_E1.iloc[:, 44].to_numpy()
print(E1_feat)
print(E1_pdnc_labels)

## Happy data labels

In [None]:
# E2 data and labels
df_E2 = df_E.loc[df_E[43] == 2]

E2_feat = df_E2.iloc[:, :42].to_numpy()
E2_pdnc_labels = df_E2.iloc[:, 42].to_numpy()
E2_subject_id = df_E2.iloc[:, 44].to_numpy()
print(E2_feat)
print(E2_pdnc_labels)

## Fear data and labels

In [None]:
# E3 data and labels
df_E3 = df_E.loc[df_E[43] == 3]

E3_feat = df_E3.iloc[:, :42].to_numpy()
E3_pdnc_labels = df_E3.iloc[:, 42].to_numpy()
E3_subject_id = df_E3.iloc[:, 44].to_numpy()
print(E3_feat)
print(E3_pdnc_labels)

## Disgust data and Labels

In [None]:
# E4 data and labels
df_E4 = df_E.loc[df_E[43] == 4]

E4_feat = df_E4.iloc[:, :42].to_numpy()
E4_pdnc_labels = df_E4.iloc[:, 42].to_numpy()
E4_subject_id = df_E4.iloc[:, 44].to_numpy()
print(E4_feat)
print(E4_pdnc_labels)

## Surprise data and Labels

In [None]:
# E5 data and labels
df_E5 = df_E.loc[df_E[43] == 5]

E5_feat = df_E5.iloc[:, :42].to_numpy()
E5_pdnc_labels = df_E5.iloc[:, 42].to_numpy()
E5_subject_id = df_E5.iloc[:, 44].to_numpy()
print(E5_feat)
print(E5_pdnc_labels)

## Anger data and labels

In [None]:
# E6 data and labels
df_E6 = df_E.loc[df_E[43] == 6]

E6_feat = df_E6.iloc[:, :42].to_numpy()
E6_pdnc_labels = df_E6.iloc[:, 42].to_numpy()
E6_subject_id = df_E6.iloc[:, 44].to_numpy()
print(E6_feat)
print(E6_pdnc_labels)

## PD data and Labels

In [None]:
# PD features and labels

df_E = pd.DataFrame(feature_mat)
df_E[len(df_E.columns)] = df_y_pdnc
df_E[len(df_E.columns)] = df_y_multi
df_E[len(df_E.columns)] = df_y_hvlv
df_E[len(df_E.columns)] = df_y_hala
df_E[len(df_E.columns)] = subject_id
df_E

In [None]:
df_PD = df_E.loc[df_E[42] == 1] #rows with PD entry

pd_feat = df_PD.iloc[:, :42].to_numpy()
pd_multi_labels = df_PD.iloc[:, 43].to_numpy() #multi labels column
pd_hvlv_labels = df_PD.iloc[:, 44].to_numpy() #hvlv labels
pd_hala_labels = df_PD.iloc[:, 45].to_numpy() #hala labels
pd_subject_id = df_PD.iloc[:, 46].to_numpy()

## HC data and labels

In [None]:
# NC feat and labels

df_NC = df_E.loc[df_E[42] == 0] #rows with NC entry

nc_feat = df_NC.iloc[:, :42].to_numpy()
nc_multi_labels = df_NC.iloc[:, 43].to_numpy() #multi labels column
nc_hvlv_labels = df_NC.iloc[:, 44].to_numpy() #hvlv labels
nc_hala_labels = df_NC.iloc[:, 45].to_numpy() #hala labels
nc_subject_id = df_NC.iloc[:, 46].to_numpy()

In [None]:
# save all these to a .mat file

savemat('/content/drive/MyDrive/EEG Analysis/Features/Spectral/spectral_feat_tensor_full_with_full_labels.mat',
        {'full_feat': full_feat, 'full_pdnc_labels': full_pdnc_labels,
         'full_hvlv_labels' : full_hvlv_labels, 'full_hala_labels':full_hala_labels,
         'full_multi_labels':full_multi_labels, 'E1_feat':E1_feat, 'E1_pdnc_labels':E1_pdnc_labels,
         'E2_feat':E2_feat, 'E2_pdnc_labels':E2_pdnc_labels, 'E3_feat':E3_feat,
         'E3_pdnc_labels':E3_pdnc_labels, 'E4_feat':E4_feat, 'E4_pdnc_labels':E4_pdnc_labels,
         'E5_feat':E5_feat, 'E5_pdnc_labels':E5_pdnc_labels, 'E6_feat':E6_feat,
         'E6_pdnc_labels':E6_pdnc_labels, 'pd_feat':pd_feat, 'pd_hvlv_labels':pd_hvlv_labels,
         'pd_hala_labels':pd_hala_labels, 'pd_multi_labels':pd_multi_labels, 'nc_feat':nc_feat,
         'nc_hvlv_labels':nc_hvlv_labels, 'nc_hala_labels': nc_hala_labels,
         'nc_multi_labels':nc_multi_labels, 'seconds':seconds, 'sampling_rate':sampling_rate, 
         'order':7, 'subject_id': subject_id, 'E1_subject_id': E1_subject_id, 'E2_subject_id':E2_subject_id,
         'E3_subject_id': E3_subject_id, 'E4_subject_id': E4_subject_id, 'E5_subject_id':E5_subject_id,
        'E6_subject_id': E6_subject_id, 'pd_subject_id': pd_subject_id, 'nc_subject_id': nc_subject_id})

# Archive (Blackboard)

In [None]:
labels_class = {43: {'E1': 1, 'E2':2, 'E3':3, 'E4':4, 'E5':5, 'E6':6},42: {'PD': 1, 'NC':0}}
df_y_pdnc = pd.DataFrame(y_vec_pdnc, dtype='string')
df_y_emot = pd.DataFrame(y_vec_emot, dtype='string')

In [None]:
#Write to csv
df = pd.DataFrame(feature_mat)
labels_class = {43: {'E1': 1, 'E2':2, 'E3':3, 'E4':4, 'E5':5, 'E6':6},42: {'PD': 0, 'NC':1}}
df_y_pdnc = pd.DataFrame(y_vec_pdnc, dtype='string')
df_y_emot = pd.DataFrame(y_vec_emot, dtype='string')

df[len(df.columns)]=df_y_pdnc #add pdnc labels
df[len(df.columns)]=df_y_emot #add emotions labels
df_new = df.replace(labels_class)
print(df_new.head())

df_new.to_csv('/content/drive/MyDrive/EEG Analysis/Features/spectral_O7_feat_with_labels_full.csv', index=False, header=False)

In [None]:
#E1 data
dfE1 = df_new.loc[df_new[43] == 1]
print(dfE1.head())
dfE1.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E1.csv', index=False, header=False)

In [None]:
#E2 data
dfE2 = df_new.loc[df_new[43] == 2]
print(dfE2.head())
dfE2.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E2.csv', index=False, header=False)

In [None]:
#E3 data
dfE3 = df_new.loc[df_new[43] == 3]
print(dfE3.head())
dfE3.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E3.csv', index=False, header=False)

In [None]:
#E4 data
dfE4 = df_new.loc[df_new[43] == 4]
print(dfE4.head())
dfE4.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E4.csv', index=False, header=False)

In [None]:
#E5 data
dfE5 = df_new.loc[df_new[43] == 5]
print(dfE5.head())
dfE5.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E5.csv', index=False, header=False)

In [None]:
#E6 data
dfE6 = df_new.loc[df_new[43] == 6]
print(dfE6.head())
dfE6.to_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_E6.csv', index=False, header=False)

In [None]:
def reduce_dim(X, y, pca_thresh, Scaler):
  y = np.ravel(y)
  if Scaler=="MinMax":
    from sklearn.preprocessing import MinMaxScaler
    sc = MinMaxScaler()
    X = sc.fit_transform(X)
  elif Scaler=="MaxAbs":
    from sklearn.preprocessing import MaxAbsScaler
    sc = MaxAbsScaler()
    X = sc.fit_transform(X)
  elif Scaler == "Standard":
    from sklearn.preprocessing import StandardScaler
    sc = StandardScaler()
    X = sc.fit_transform(X)
  elif Scaler == "Normalizer":
    from sklearn.preprocessing import Normalizer
    sc = Normalizer()
    X = sc.fit_transform(X)
  else:
    pass
  if pca_thresh<1:
    pca = PCA(pca_thresh)
    X = pca.fit_transform(X)
  return X, y

def NB_classifier(X, y, cv, grid_search):
  if grid_search:
    nb_classifier = GaussianNB()
    params_NB = {'var_smoothing': np.logspace(0,-9, num=100)}
    gs_NB = GridSearchCV(estimator=nb_classifier, 
                         param_grid=params_NB, cv=cv,   # use any cross validation technique 
                         verbose=1,
                         n_jobs=2,
                         scoring='accuracy') 
    gs_NB.fit(X, y)
    print("Best parameters:",gs_NB.best_params_)
    var_smoothing = gs_NB.best_params_.get('var_smoothing')
  else:
    var_smoothing = 1e-9
  # your classifier, assume GaussianNB here for non-integer data X
  estimator = GaussianNB(var_smoothing=var_smoothing)
  # generate your cross-validation prediction with 10 fold Stratified sampling
  y_pred = cross_val_predict(estimator, X, y, cv=cv)

  # generate report
  clf_report = classification_report(y, y_pred)
  conf_mat = confusion_matrix(y, y_pred, labels = [1,0])
  return clf_report, conf_mat

def SVM_classifier(X, y, cv, kernel, grid_search):
  if grid_search:
    Cs = [0.001, 0.01, 0.1, 1, 10, 100]
    gammas = [0.001, 0.01, 0.1, 1, 10]
    p_grid = {'C': Cs, 'gamma' : gammas}
    svm_classifier = SVC()
    params_svm = GridSearchCV(estimator=svm_classifier, param_grid=p_grid,
                              cv=cv, refit=True,verbose=1, scoring='accuracy', n_jobs = 8)
    params_svm.fit(X, y)
    print('Best parameters:',params_svm.best_params_)
    C = params_svm.best_params_.get('C')
    gamma = params_svm.best_params_.get('gamma')
    estimator = SVC(kernel=kernel, C=C, gamma = gamma)
    y_pred = cross_val_predict(estimator, X, y, cv=cv)
    # generate report
    clf_report = classification_report(y, y_pred)
    conf_mat = confusion_matrix(y, y_pred, labels = [1,0])
    return clf_report, conf_mat

In [None]:
df = pd.read_csv('/content/drive/MyDrive/EEG Analysis/spectral_feat_with_labels_full.csv', header = None)
dfx = df.iloc[:, :-2]
dfy = df.iloc[:, -2]
X_full,y_full = reduce_dim(dfx, dfy, 1, 'Normalizer')

In [None]:
clf_report, conf_mat = SVM_classifier(X_full, y_full, cv=10, kernel='linear', grid_search=1)
print(clf_report)
print(conf_mat)

## Generate Tensor

In [None]:
# butter band_pass_filter
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    out = lfilter(b, a, data)
    return out

def theta_BP(data):
    return butter_bandpass_filter(data, 4, 8, 128)

def alpha_BP(data):
    return butter_bandpass_filter(data, 8, 12, 128)

def beta_BP(data):
    return butter_bandpass_filter(data, 12, 30, 128)

#band power AKA sum of squares of fft coeff

def average_intensity(band):
    return sum(np.square(abs(band)))


# theta alpha beta calc
def theta_alpha_beta_calc(data_eeg,max_channel,instance_no):
    thetas = []
    alphas = []
    betas = []

    for i in range(0,max_channel,1):
        data = data_eeg[i,64:,instance_no]
        thetas.append(average_intensity(fft(theta_BP(data))))
        alphas.append(average_intensity(fft(alpha_BP(data))))
        betas.append(average_intensity(fft(beta_BP(data))))

    return np.concatenate((thetas,alphas,betas))


# making feature matrics
def feature_matrix_calc(eeg_data,max_instance):
    feature_matrix = []
    for j in range(0,max_instance,1):
        feature_matrix.append(theta_alpha_beta_calc(eeg_data,14,j))
    return np.array(feature_matrix)


def feature_eeg(dir):

    final_feature_mat = []
    for filename in sorted(os.listdir(dir)):
        if filename.endswith(".mat"): 
            print('Genrating Features for file = ',filename)
            M_data = loadmat(os.path.join(dir, filename))
            eeg_data = M_data[filename[:-4]]
            final_feature_mat.append(feature_matrix_calc(eeg_data,eeg_data.shape[2]))
        else:
            continue
    return np.array(final_feature_mat)

In [None]:
df = pd.read_csv('/content/drive/MyDrive/EEG Analysis/Features/Spectral/spectral_feat_with_labels_full.csv', header = None)

In [None]:
dfx = df.iloc[:, :-2]
dfpdnc = df.iloc[:, -2]
dfemot = df.iloc[:,-1]

In [None]:
df.head()