In [3]:
#IMPORT LIBRARIES AND REFORMATTED FILES
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import openpyxl
!pip install spkit
import spkit as sp
from spkit.data import load_data
#sp.__version__
!pip install pywavelets
import pywt
from ast import literal_eval
from scipy.signal import butter, filtfilt

!pip install mne
import mne
from mne.decoding import CSP

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, normalize
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

Collecting mne
  Downloading mne-1.7.0-py3-none-any.whl (7.4 MB)
                                              0.0/7.4 MB ? eta -:--:--
     -                                        0.2/7.4 MB 6.3 MB/s eta 0:00:02
     --                                       0.5/7.4 MB 5.9 MB/s eta 0:00:02
     ----                                     0.8/7.4 MB 6.3 MB/s eta 0:00:02
     ------                                   1.2/7.4 MB 6.6 MB/s eta 0:00:01
     --------                                 1.5/7.4 MB 6.4 MB/s eta 0:00:01
     ---------                                1.8/7.4 MB 6.3 MB/s eta 0:00:01
     ----------                               2.0/7.4 MB 6.0 MB/s eta 0:00:01
     ------------                             2.3/7.4 MB 6.5 MB/s eta 0:00:01
     --------------                           2.6/7.4 MB 6.2 MB/s eta 0:00:01
     ---------------                          2.9/7.4 MB 6.4 MB/s eta 0:00:01
     ----------------                         3.1/7.4 MB 6.2 MB/s eta 0:00:01
     -

In [4]:
def remove_training_artifact(X):
  #X should be of shape (n, ch)

  fs = 250
  t = np.arange(X.shape[0])/fs #Sets time interval for training data

  XR1 = sp.eeg.ATAR_mCh(X.copy()) #Applies ATAR algorithm to filtered sample data (i.e. artifact removed)

  return XR1

In [5]:
from sklearn.preprocessing import normalize

#CLASSIFICATION OF PRERECORDED SIGNALS
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score

def bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    y = filtfilt(b, a, data, axis=-1)
    return y

# Define filter parameters
mu_lowcut = 7
mu_highcut = 13
beta_lowcut = 13
beta_highcut = 30
fs = 250  # Sampling frequency

In [6]:
#version of algorithm that directly takes the X and Y arrays rather than DF

def predict_results_recorded_nparray(X, y):
  """
  eeg_data = data.iloc[:, start_col:end_col].applymap(np.array).values  #isolate electrode data

  y = data.iloc[:, label_col]  #labels (left/right)

  #find minimum sequence length
  min_length = min(len(eeg_data[i, j]) for i in range(eeg_data.shape[0]) for j in range(eeg_data.shape[1]))

  n_samples = eeg_data.shape[0] #get number trials/samples and number channels in data
  n_channels = eeg_data.shape[1]

  scale_range = np.arange(1,126)  #frequencies to separate into - nyquist - Fs = 250 Hz

  X = np.empty((n_samples, n_channels, min_length)) #reshape to 3d array
  for i in range(n_samples):
    for j in range(n_channels):
      X[i, j, :] = eeg_data[i, j][:min_length]  #truncate so have equal length signals

  """

  n_samples = X.shape[0]
  n_channels = X.shape[1]
  min_length = X.shape[2]
  scale_range = np.arange(1,256)  #frequencies to separate into - nyquist - Fs = 250 Hz

  X_filtered = bandpass_filter(X, 7, 13, 512)
  X = X_filtered

  """
  X_artifact_removed = np.empty_like(X)   #empty dataframe to put artifact removed samples in

  for i in range(n_samples):
      X_this_trial = X[i, :, :].transpose()   #reshape to (n, ch) from (ch, n)
      X_this_trial_ATAR = remove_training_artifact(X_this_trial) #apply ATAR to properly shaped array
      X_ATAR_reshaped = X_this_trial_ATAR.transpose() #reshape to (ch, n) to put back in df

      X_artifact_removed[i, :, :] = X_ATAR_reshaped     #put in artifact removed df

  X = X_artifact_removed
  """

  X_normalized = np.empty_like(X) #empty df for normalized data

  for i in range(n_samples):
    X_to_norm = X[i, :, :].transpose()  #(n, ch)

    X_normed = normalize(X_to_norm).transpose()     #normalize and reshape to (ch, n)

    X_normalized[i, :, :] = X_normed  #put in normalized df

  X = X_normalized

  cwt_coeffs = [] #to be calculated, end result wanted for feature extraction

  #populate cwt_coeffs array for each trial and channel's signal
  for i in range(n_samples):
    sample_coeffs = []  #coeffs in this trial

    for j in range(n_channels):
      coeffs, freqs = pywt.cwt(X[i, j, :], scale_range, 'morl') #coeff for specific trial and channel
      sample_coeffs.append(coeffs)

    cwt_coeffs.append(sample_coeffs)

  cwt_coeffs = np.array(cwt_coeffs)   #the cwt coeffs with shape (trials, channels, scales, data points)

  n_samples, n_channels, n_scales, n_times = cwt_coeffs.shape
  cwt_reshaped = cwt_coeffs.reshape(n_samples, n_channels, n_scales * n_times) #reshape to 3d so can apply to csp

  csp = CSP(n_components = 4, reg='ledoit_wolf', log=True, norm_trace = False) #initialize csp
  csp.fit(cwt_reshaped, y)  #fit to cwt coeffs from eeg data
  cwt_transformed = csp.transform(cwt_reshaped)  #transform to maximize variance

  #train test split for LDA
  X_train, X_test, y_train, y_test = train_test_split(cwt_transformed, y, test_size=0.3, random_state=42, stratify = y)

  #LDA portion
  classifier = LDA()
  classifier.fit(X_train, y_train)
  predictions = classifier.predict(X_test)

  # Cross-validation for more robust evaluation
  skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=34)
  cv_scores = cross_val_score(classifier, cwt_transformed, y, cv=skf)
  cv_accuracy = np.mean(cv_scores)

  print(f"Cross-validated Accuracy: {cv_accuracy:.4f}")

  accuracy = accuracy_score(y_test, predictions)
  return accuracy

In [8]:
imagery_left = np.genfromtxt('imagery_left.csv', delimiter=',', max_rows=64)
imagery_right = np.genfromtxt('imagery_right.csv', delimiter=',', max_rows=64)
imagery_event = np.genfromtxt('imagery_event.csv', delimiter=',', max_rows=64)

In [9]:
print("shape of data: ", imagery_left.shape)
print("shape of labels: ", imagery_event.shape)

# generate the indices of motor imagery onset
indices = np.where(imagery_event == 1)[0]
print("indices of motor imagery: ", indices[:10])

# define the bandpass filter
def bandpass_filter(data, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

fs = 512
imagery_left_filt = bandpass_filter(imagery_left, 4, 40, fs)
imagery_right_filt = bandpass_filter(imagery_right, 4, 40, fs)

def normalize(data):
  return (data - np.mean(data, axis=1, keepdims=True))/np.std(data, axis=1, keepdims=True)

imagery_left_norm = normalize(imagery_left_filt)
imagery_right_norm = normalize(imagery_right_filt)

fs = 512

imagery_left_sliced = np.zeros((100, 64, 3*fs))
imagery_right_sliced = np.zeros((100, 64, 3*fs))

for i, index in enumerate(indices):
    #print(i, index)
    imagery_left_sliced[i] = imagery_left_norm[:, index:index+1536]
    imagery_right_sliced[i] = imagery_right_norm[:, index:index+1536]

imageries = np.concatenate((imagery_left_sliced, imagery_right_sliced), axis = 0)
labels = np.concatenate((np.zeros(100), np.ones(100)))

print(imageries.shape)
print(labels.shape)

shape of data:  (64, 358400)
shape of labels:  (358400,)
indices of motor imagery:  [ 1023  4607  8191 11775 15359 18943 22527 26111 29695 33279]
(200, 64, 1536)
(200,)


In [11]:
indices_to_keep = [12, 49, 47, 20, 30, 57]
filtered_imageries = imageries[:, indices_to_keep, :]

print(filtered_imageries.shape)

outside_data_accuracy = predict_results_recorded_nparray(filtered_imageries, labels)

(200, 6, 1536)
Computing rank from data with rank=None
    Using tolerance 37 (2.2e-16 eps * 6 dim * 2.8e+16  max singular value)
    Estimated rank (data): 6
    data: rank 6 computed from 6 data channels with 0 projectors
Reducing data rank from 6 -> 6
Estimating class=0.0 covariance using LEDOIT_WOLF
Done.
Estimating class=1.0 covariance using LEDOIT_WOLF
Done.
Cross-validated Accuracy: 0.7350


In [12]:
print("Outside data accuracy: " + str(outside_data_accuracy))

Outside data accuracy: 0.75
