<a href="https://colab.research.google.com/github/ljmcpherson/ISP_BCI/blob/main/Another_copy_of_load_ECoG_motor_imagery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Loading of Miller ECoG data of motor imagery

includes some visualizations

In [None]:
# @title Data retrieval
import os, requests

fname = 'motor_imagery.npz'
url = "https://osf.io/ksqv8/download"

if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("!!! Failed to download data !!!")
  else:
    if r.status_code != requests.codes.ok:
      print("!!! Failed to download data !!!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)

In [None]:
# @title Install packages (`nilearn`, `nimare`. `duecredit`), import `matplotlib` and set defaults
# install packages to visualize brains and electrode locations
!pip install nilearn --quiet
!pip install nimare --quiet
!pip install duecredit --quiet

from matplotlib import rcParams
from matplotlib import pyplot as plt

rcParams['figure.figsize'] = [20, 4]
rcParams['font.size'] = 15
rcParams['axes.spines.top'] = False
rcParams['axes.spines.right'] = False
rcParams['figure.autolayout'] = True

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.4/10.4 MB[0m [31m36.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m70.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m237.3/237.3 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.0/49.0 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.9/179.9 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# @title Data loading
import numpy as np

alldat = np.load(fname, allow_pickle=True)['dat']

# select just one of the recordings here. 11 is nice because it has some neurons in vis ctx.
dat1 = alldat[1][0]
dat2 = alldat[1][1]

print(dat1.keys())
print(dat2.keys())

dict_keys(['t_off', 'stim_id', 't_on', 'srate', 'V', 'scale_uv', 'locs', 'hemisphere', 'lobe', 'gyrus', 'Brodmann_Area'])
dict_keys(['t_off', 'stim_id', 't_on', 'srate', 'V', 'scale_uv', 'locs', 'hemisphere', 'lobe', 'gyrus', 'Brodmann_Area'])


In [None]:
dat1["V"].shape

(390680, 64)

In [None]:
# a = np.arange(46)
a = [1,2,3,4]
dat1["V"][1000:2000][:,a].shape

(1000, 4)

In [None]:
import numpy as np

alldat = np.load(fname, allow_pickle=True)['dat']
channels = np.arange(46) # channels from 0 to 46 are picked / you can change this to pick only some channels  - numpy.array

def split_classes(data, stim_id_1, stim_id_2, sub = 0, binary_stim_id = True , channels = channels):
    """
    Parameters:
    - data: Dictionary containing the data.
    - stim_id_1: Stimulus ID for class 1.
    - stim_id_2: Stimulus ID for class 2.
    - sub : constraint on the t_off --> 3000 - sub

    Returns:
    - class_1_data: Data dictionary for class 1.
    - class_2_data: Data dictionary for class 2.
    """
    # Initialize dictionaries for the two classes
    class_1_data = {'V': [], 't_on': [], 't_off': [], 'stim_id': []}
    class_2_data = {'V': [], 't_on': [], 't_off': [], 'stim_id': []}
    # class_1_data = {'V': [], 't_on': []}
    # class_2_data = {'V': [], 't_on': []}

    # Iterate through the stimulus IDs and split the data
    for i, stim_id in enumerate(data['stim_id']):
        t_on = data['t_on'][i]
        t_off = data['t_off'][i] - sub
        if stim_id == stim_id_1:
            class_1_data['V'].append(data['V'][t_on:t_off][:,channels])
            class_1_data['t_on'].append(t_on)
            class_1_data['t_off'].append(t_off)
            class_1_data['stim_id'].append(stim_id)
        elif stim_id == stim_id_2:
            class_2_data['V'].append(data['V'][t_on:t_off][:,channels])
            class_2_data['t_on'].append(t_on)
            class_2_data['t_off'].append(t_off)
            class_2_data['stim_id'].append(stim_id)

    # Convert lists to numpy arrays
    class_1_data['V'] = np.array(class_1_data['V'])
    class_1_data['t_on'] = np.array(class_1_data['t_on'])
    class_1_data['t_off'] = np.array(class_1_data['t_off'])
    class_1_data['stim_id'] = np.array(class_1_data['stim_id'])

    class_2_data['V'] = np.array(class_2_data['V'])
    class_2_data['t_on'] = np.array(class_2_data['t_on'])
    class_2_data['t_off'] = np.array(class_2_data['t_off'])
    class_2_data['stim_id'] = np.array(class_2_data['stim_id'])


    if binary_stim_id:
        class_1_data['stim_id'] = 1 - (class_1_data['stim_id']==11).astype(int)
        class_2_data['stim_id'] = (class_2_data['stim_id']==12).astype(int)

    return class_1_data, class_2_data


def get_all(data, stim_id_1 = 11, stim_id_2 = 12, sub = 0, binary_stim_id = True, channels = channels):
  """
  //TODO
  """
  real = {"tongue" : [], "hand" : []}
  imagine = {"tongue" : [], "hand" : []}
  # Iterate over the datasets
  for i in range(7):
      # Split classes for the current dataset
      real_classes_0, real_classes_1 = split_classes(alldat[i][0], stim_id_1, stim_id_2, sub = sub, binary_stim_id = binary_stim_id, channels = channels)
      imagine_classes_0,  imagine_classes_1 = split_classes(alldat[i][1], stim_id_1, stim_id_2, sub = sub, binary_stim_id = binary_stim_id, channels = channels)

      # Append the results to the lists
      real["tongue"].append(real_classes_0)
      imagine["tongue"].append(imagine_classes_0)
      real["hand"].append(real_classes_1)
      imagine["hand"].append(imagine_classes_1)


  real["tongue"] = np.array(real["tongue"])
  imagine["tongue"] = np.array(imagine["tongue"])
  real["hand"] = np.array(real["hand"])
  imagine["hand"] = np.array(imagine["hand"])

  return real, imagine
# split '4' = rt, rh, it, ih
# split '2' = rt & it, rh & ih
# split 'r' = rt, rh
# split 'i' = it, ih
def get_X_Y(real, imagine, channels = channels, flatten = True, shuffle = True, split = 'r'):
    """
    //TODO

    Didnt need to use the stim_id from dictionaries ... Might use them later ...
    """
    X = []
    Y = []

    for i in range(real['tongue'].shape[0]):
        if split == '4':
          X.append(real['tongue'][i]['V'])
          X.append(imagine['tongue'][i]['V'])
          Y.append(np.zeros(real['tongue'][i]['V'].shape[0]))
          Y.append(np.zeros(imagine['tongue'][i]['V'].shape[0]) + 1)
        elif split == '2':
          X.append(real['tongue'][i]['V'])
          X.append(imagine['tongue'][i]['V'])
          Y.append(np.zeros(real['tongue'][i]['V'].shape[0]))
          Y.append(np.zeros(imagine['tongue'][i]['V'].shape[0]))
        elif split == 'r':
          X.append(real['tongue'][i]['V'])
          Y.append(np.zeros(real['tongue'][i]['V'].shape[0]))
        elif split == 'i':
          X.append(imagine['tongue'][i]['V'])
          Y.append(np.zeros(imagine['tongue'][i]['V'].shape[0]))

    for i in range(real['hand'].shape[0]):
        if split == '4':
          X.append(real['hand'][i]['V'])
          X.append(imagine['hand'][i]['V'])
          Y.append(np.ones(real['hand'][i]['V'].shape[0]) + 1)
          Y.append(np.ones(imagine['hand'][i]['V'].shape[0]) + 2)
        elif split == '2':
          X.append(real['hand'][i]['V'])
          X.append(imagine['hand'][i]['V'])
          Y.append(np.ones(real['hand'][i]['V'].shape[0]))
          Y.append(np.ones(imagine['hand'][i]['V'].shape[0]))
        elif split == 'r':
          X.append(real['hand'][i]['V'])
          Y.append(np.ones(real['hand'][i]['V'].shape[0]))
        elif split == 'i':
          X.append(imagine['hand'][i]['V'])
          Y.append(np.ones(imagine['hand'][i]['V'].shape[0]))

    X = np.array(X)
    X = X.reshape(-1, real['tongue'][i]['V'].shape[1], channels.shape[0]).transpose(0, 2, 1)

    Y = np.array(Y)
    Y = Y.reshape(-1)


    if flatten:
      X = X.reshape(-1, X.shape[1]*X.shape[2])

    if shuffle:
      permutation = np.random.permutation(X.shape[0])
      X = X[permutation]
      Y = Y[permutation]

    return X,Y


real, imagine = get_all(data=alldat, sub = 2000)
X,Y = get_X_Y(real, imagine)
print(Y)


[0. 0. 1. 1. 1. 0. 1. 0. 0. 1. 1. 0. 0. 1. 1. 1. 1. 0. 1. 1. 0. 0. 1. 0.
 1. 0. 0. 0. 0. 1. 0. 1. 1. 0. 1. 1. 1. 0. 1. 1. 0. 1. 0. 1. 1. 1. 0. 0.
 0. 0. 0. 0. 1. 1. 0. 0. 1. 0. 1. 0. 0. 1. 0. 1. 1. 1. 1. 1. 0. 1. 0. 1.
 1. 0. 1. 1. 1. 1. 1. 1. 0. 1. 1. 1. 0. 1. 0. 0. 0. 1. 0. 0. 1. 1. 0. 0.
 0. 0. 1. 1. 1. 0. 1. 1. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 0. 1. 0. 1. 1. 0.
 0. 0. 0. 1. 0. 1. 1. 1. 1. 1. 0. 0. 1. 0. 1. 1. 1. 1. 1. 1. 0. 1. 1. 1.
 0. 1. 1. 0. 1. 1. 1. 0. 1. 1. 0. 0. 1. 1. 0. 1. 1. 1. 1. 1. 1. 1. 0. 1.
 0. 1. 0. 1. 0. 1. 1. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 1. 0. 0. 0. 1. 0.
 1. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 1. 1. 0. 1. 1. 0. 0. 0. 1. 0. 1. 1. 0.
 1. 0. 1. 0. 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 1. 1. 0. 0. 0. 0. 0. 1. 1. 0.
 1. 0. 1. 0. 0. 1. 1. 1. 1. 0. 0. 1. 1. 1. 1. 0. 0. 1. 0. 0. 1. 1. 0. 0.
 1. 1. 0. 1. 0. 0. 1. 0. 1. 1. 0. 1. 0. 1. 1. 0. 1. 1. 0. 1. 0. 0. 0. 0.
 1. 0. 0. 0. 1. 0. 1. 1. 1. 0. 0. 0. 0. 0. 1. 0. 1. 1. 1. 1. 1. 0. 0. 1.
 1. 1. 1. 0. 0. 0. 1. 1. 1. 0. 0. 1. 1. 1. 0. 1. 0.

# possible issue ? different number of channels ? Temp solution by setting 46 hard constraint...


In [None]:
real, imagine = get_all(data=alldat, sub = 1000)

subject_id = 0
exp = real
action = "tongue"

print("Subject 1 - V - tongue - real movement:", exp[action][subject_id]['V'].shape)
print("Subject 1 - t_on - tongue - real movement:", exp[action][subject_id]['t_on'])
print("Subject 1 - t_off - tongue - real movement:", exp[action][subject_id]['t_off'])
print("Subject 1 - stim_id - tongue - real movement:", exp[action][subject_id]['stim_id'])

Subject 1 - V - tongue - real movement: (30, 2000, 46)
Subject 1 - t_on - tongue - real movement: [ 28240  52440  58520  76680  82680  88720 100760 106800 112840 131040
 137080 149160 155160 179320 185320 197400 227560 239640 257720 263760
 275840 281840 287880 293880 299880 330080 342200 348240 360360 366440]
Subject 1 - t_off - tongue - real movement: [ 30240  54440  60520  78680  84680  90720 102760 108800 114840 133040
 139080 151160 157160 181320 187320 199400 229560 241640 259720 265760
 277840 283840 289880 295880 301880 332080 344200 350240 362360 368440]
Subject 1 - stim_id - tongue - real movement: [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [None]:
X,Y = get_X_Y(real, imagine, split = 'r')

print(X.shape)
print(Y.shape)

print(np.unique(Y))
# print(X[:5,:3]) # first 5 samples and their first 3 features
# print(Y[:5]) # first 5 target values

X,Y = get_X_Y(real, imagine, split = 'r')

print(X.shape)
print(Y.shape)

print(np.unique(Y))
print(X[:5,:3]) # first 5 samples and their first 3 features
print(Y[:5]) # first 5 target values

(420, 92000)
(420,)
[0. 1.]
(420, 92000)
(420,)
[0. 1.]
[[-0.2158  -0.1781  -0.1556 ]
 [-2.186   -2.273   -2.35   ]
 [ 0.3286  -0.428   -0.129  ]
 [ 0.3833   0.332    0.36   ]
 [-0.06976  0.0889   0.08984]]
[1. 0. 1. 0. 1.]


In [None]:
!pip install lazypredict

Collecting lazypredict
  Downloading lazypredict-0.2.12-py2.py3-none-any.whl.metadata (12 kB)
Downloading lazypredict-0.2.12-py2.py3-none-any.whl (12 kB)
Installing collected packages: lazypredict
Successfully installed lazypredict-0.2.12


In [None]:
#Feature extraction function
def get_features_single(signal, sample_rate=1000, plot=False, log_scale=True):

    f_orig, psd_orig = compute_psd(signal, sample_rate, log_scale=log_scale)

    f_filt_LFB, psd_filt_LFB = filter_psd(f_orig, psd_orig, low_cut=8, high_cut=32)
    f_filt_HFB, psd_filt_HFB = filter_psd(f_orig, psd_orig, low_cut=76, high_cut=100)

    def extract_features(psd_values):
        return {
            'mean': np.mean(psd_values),
            'max': np.max(psd_values),
            'std': np.std(psd_values),
            'min': np.min(psd_values),
            'raw_values': psd_values
        }

    features_LFB = extract_features(psd_filt_LFB)
    features_HFB = extract_features(psd_filt_HFB)

    # Plotting
    if plot:
        t = np.arange(len(signal)) / sample_rate

        plt.figure(figsize=(16, 9))

        # Plot the original signal
        plt.subplot(2, 1, 1)
        plt.plot(t, signal, label='Original Signal')
        plt.title(f'Original Signal')
        plt.xlabel('Time [s]')
        plt.ylabel('Amplitude')
        plt.grid()
        plt.legend()

        # Plot the PSD with vertical lines for the filtered frequency bands
        plt.subplot(2, 1, 2)
        plt.plot(f_orig, psd_orig, label='PSD of Original Signal')
        filt_values = [8, 32, 76, 100]
        for x in filt_values:
            plt.vlines(x, ymin=min(psd_orig), ymax=max(psd_orig), colors='r', linestyles='--')
        plt.title('Power Spectral Density of Original Signal')
        plt.xlabel('Frequency [Hz]')
        plt.ylabel('Log Power')
        plt.grid()
        plt.legend()
        plt.tight_layout()
        plt.show()

    return features_LFB, features_HFB

def get_features_all(X, sample_rate=1000, type_feature='statistical', out_features='low+high'):
    feature_list = []

    for sample in X:
        S = []
        for signal in sample:
            features_LFB, features_HFB = get_features_single(
                signal,
                sample_rate=sample_rate,
                plot=False,
                log_scale=True
            )

            # Select feature type based on 'type_feature'
            if type_feature == 'statistical':
                selected_LFB = [features_LFB['mean'], features_LFB['std']]
                selected_HFB = [features_HFB['mean'], features_HFB['std']]
            elif type_feature == 'raw':
                selected_LFB = features_LFB['raw_values']
                selected_HFB = features_HFB['raw_values']
            elif type_feature == 'all':
                selected_LFB = [features_LFB['mean'], features_LFB['std'], features_LFB['max'], features_LFB['min']]
                selected_HFB = [features_HFB['mean'], features_HFB['std'], features_HFB['max'], features_HFB['min']]


            # Combine features based on 'out_features'
            if out_features == 'low':
                S.append(selected_LFB)
            elif out_features == 'high':
                S.append(selected_HFB)
            elif out_features == 'low+high':
                combined_features = np.array([selected_LFB, selected_HFB]).flatten()
                S.append(combined_features)

        feature_list.append(S)

    return np.array(feature_list)

Phase and magnitude

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from scipy.fft import fft, fftfreq
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

# Define the compute_psd function
def compute_psd(signal, sample_rate, log_scale=True):
    freqs = np.fft.fftfreq(len(signal), 1/sample_rate)
    psd = np.abs(np.fft.fft(signal))**2
    if log_scale:
        psd = np.log(psd + 1e-8)
    return freqs, psd

# Define the get_features_single function
def get_features_single(signal, sample_rate=1000, log_scale=True):
    f_orig, psd_orig = compute_psd(signal, sample_rate, log_scale=log_scale)

    # Filter and get PSD for low frequency band (example)
    low_cut, high_cut = 8, 32
    psd_filt_LFB = psd_orig[(f_orig >= low_cut) & (f_orig <= high_cut)]

    # Filter and get PSD for high frequency band (example)
    low_cut, high_cut = 32, 100
    psd_filt_HFB = psd_orig[(f_orig >= low_cut) & (f_orig <= high_cut)]

    # Combine features from different bands (example)
    features = np.concatenate([psd_filt_LFB, psd_filt_HFB])
    return features

# Define the get_features_all function
def get_features_all(data, sample_rate=1000):
    num_samples, num_points = data.shape
    features_list = []

    for sample in data:
        features = get_features_single(sample, sample_rate=sample_rate)
        features_list.append(features)

    return np.array(features_list)

# Function to split data into relevant frequency bands and compute PSD and phase features
def compute_band_features(data, sampling_rate, bands):
    num_samples, num_features = data.shape
    num_bands = len(bands)
    band_features = []

    for i in range(num_samples):
        sample_features = []
        for (low_freq, high_freq) in bands:
            yf = fft(data[i])
            xf = fftfreq(num_features, 1 / sampling_rate)
            band_indices = np.where((xf >= low_freq) & (xf <= high_freq))[0]

            psd_band = np.abs(yf[band_indices]) ** 2
            phase_band = np.angle(yf[band_indices])

            sample_features.extend(np.concatenate((psd_band, phase_band)))
        band_features.append(sample_features)

    return np.array(band_features)

# Define frequency bands (example bands: Delta, Theta, Alpha, Beta, Gamma)
bands = [(0.5, 4), (4, 8), (8, 12), (12, 30), (30, 50)]

# Separate both X and Y as test and train data
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=2024)

# Get features for train and test
X_train_features = get_features_all(X_train, sample_rate=1000)
X_test_features = get_features_all(X_test, sample_rate=1000)

# Compute band-specific features
sampling_rate = 1000  # Adjust according to your data's sampling rate
band_features_train = compute_band_features(X_train_features, sampling_rate, bands)
band_features_test = compute_band_features(X_test_features, sampling_rate, bands)

# Standardize band features
scaler_band = StandardScaler()
band_features_train = scaler_band.fit_transform(band_features_train)
band_features_test = scaler_band.transform(band_features_test)

# Combine original and band-specific features
X_train_combined = np.hstack((X_train_features, band_features_train))
X_test_combined = np.hstack((X_test_features, band_features_test))

In [None]:
# @title Random Forest Classifier
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import warnings

warnings.filterwarnings('ignore')

In [None]:
#Random Forest
# Import Random Forest classifier
from sklearn.ensemble import RandomForestClassifier

# instantiate the classifier
rfc = RandomForestClassifier(n_estimators= 1000, random_state=0)

# fit the model
rfc.fit(X_train_combined, Y_train)

# Predict the Test set results
y_pred = rfc.predict(X_test_combined)

# Check accuracy score
from sklearn.metrics import accuracy_score

print('Model accuracy score with 1000 decision-trees : {0:0.4f}'. format(accuracy_score(Y_test, y_pred)))

Model accuracy score with 1000 decision-trees : 0.4762


Change for Random Forest

In [None]:
from sklearn.model_selection import train_test_split
from lazypredict.Supervised import LazyClassifier

clf = LazyClassifier(predictions=True)
models, predictions = clf.fit(X_train_combined, X_test_combined, y_train, y_test)
print(models)

 97%|█████████▋| 28/29 [07:05<00:39, 39.78s/it]

[LightGBM] [Info] Number of positive: 145, number of negative: 149
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.860149 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 4450123
[LightGBM] [Info] Number of data points in the train set: 294, number of used features: 45000
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.493197 -> initscore=-0.027213
[LightGBM] [Info] Start training from score -0.027213


100%|██████████| 29/29 [08:45<00:00, 18.10s/it]

                               Accuracy  Balanced Accuracy  ROC AUC  F1 Score  \
Model                                                                           
KNeighborsClassifier               0.57               0.57     0.57      0.57   
BaggingClassifier                  0.56               0.57     0.57      0.56   
AdaBoostClassifier                 0.56               0.55     0.55      0.55   
LogisticRegression                 0.53               0.53     0.53      0.53   
DecisionTreeClassifier             0.52               0.52     0.52      0.52   
CalibratedClassifierCV             0.52               0.52     0.52      0.51   
PassiveAggressiveClassifier        0.52               0.52     0.52      0.52   
RidgeClassifierCV                  0.51               0.51     0.51      0.51   
RidgeClassifier                    0.51               0.51     0.51      0.51   
NuSVC                              0.51               0.51     0.51      0.51   
DummyClassifier             




# Dimension reduction ---> need more analysis to find the best number of components. etc .. other methods maybe ....


This might take a while

In [None]:
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

n_components = 30
pca = PCA(n_components=n_components)
new_X = pca.fit_transform(X)


# train test split
X_train, X_test, y_train, y_test = train_test_split(new_X, Y, test_size=0.2, random_state=1234) # 80 percent of data as train 20 percent test

# make a model and train and get the accuracy... You can see it overfits. ... maybe another method . randomforset ???? better feature selection/extraction ? run pca then t-sne ?

In [None]:
svc = SVC()
svc.fit(X_train, y_train)

y_train_pred = svc.predict(X_train)
y_test_pred = svc.predict(X_test)

train_accuracy = accuracy_score(y_train, y_train_pred)
test_accuracy = accuracy_score(y_test, y_test_pred)

print(f'Train Accuracy: {train_accuracy}')
print(f'Test Accuracy: {test_accuracy}')

Train Accuracy: 0.8363095238095238
Test Accuracy: 0.4880952380952381


# :)

Random Forest classifier

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
from imblearn.over_sampling import SMOTE
from sklearn.metrics import accuracy_score
from keras.models import Sequential
from keras.layers import Dense

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(new_X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# PCA
pca = PCA(n_components=20)
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# SMOTE for data augmentation
sm = SMOTE(random_state=42)
X_train_res, y_train_res = sm.fit_resample(X_train, y_train)

# Model training with hyperparameter tuning
param_grid = {'n_estimators': [50, 100, 200], 'max_depth': [None, 10, 20, 30]}
grid = GridSearchCV(RandomForestClassifier(), param_grid, refit=True, cv=5)
grid.fit(X_train_res, y_train_res)

# Predictions and evaluation
y_pred = grid.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f'Test Accuracy: {accuracy}')




Test Accuracy: 0.5119047619047619


Neural Network

In [None]:
real, imagine = get_all(data=alldat, sub = 0)
X,Y = get_X_Y(real, imagine)

In [None]:
X.shape

(420, 138000)

In [None]:
type(Y[0])

numpy.float64

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Bidirectional, BatchNormalization, GRU
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

# Assuming X and Y are already defined
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Dimensionality reduction with PCA
pca = PCA(n_components=100)  # Adjust n_components as needed
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
time_steps = 10  # Adjust the time steps as per your requirement
n_features = X_train.shape[1] // time_steps
X_train = X_train.reshape((X_train.shape[0], time_steps, n_features))
X_test = X_test.reshape((X_test.shape[0], time_steps, n_features))

# Build the Bidirectional GRU model with Batch Normalization
model = Sequential()
model.add(Bidirectional(GRU(64, return_sequences=True, kernel_regularizer=l2(0.01)), input_shape=(time_steps, n_features)))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(Bidirectional(GRU(64, return_sequences=False, kernel_regularizer=l2(0.01))))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(Dense(32, activation='relu', kernel_regularizer=l2(0.01)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model with RMSprop optimizer
model.compile(loss='binary_crossentropy',
              optimizer=RMSprop(learning_rate=0.0001),
              metrics=['accuracy'])

# Early stopping and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_bidirectional_gru_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stopping, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

# Plot the learning curves
plt.figure(figsize=(12, 4))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


NameError: name 'l2' is not defined

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.regularizers import l2
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

# Assuming X and Y are already defined
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Dimensionality reduction with PCA
pca = PCA(n_components=100)  # Adjust n_components as needed
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
time_steps = 10  # Adjust the time steps as per your requirement
n_features = X_train.shape[1] // time_steps
X_train = X_train.reshape((X_train.shape[0], time_steps, n_features))
X_test = X_test.reshape((X_test.shape[0], time_steps, n_features))

# Build the LSTM model with more regularization
model = Sequential()
model.add(LSTM(64, input_shape=(time_steps, n_features), return_sequences=True, kernel_regularizer=l2(0.01)))
model.add(Dropout(0.6))
model.add(LSTM(64, return_sequences=False, kernel_regularizer=l2(0.01)))
model.add(Dropout(0.6))
model.add(Dense(32, activation='relu', kernel_regularizer=l2(0.01)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Early stopping and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_lstm_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stopping, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

# Plot the learning curves
plt.figure(figsize=(12, 4))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

# Assuming X and Y are already defined
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Dimensionality reduction with PCA
pca = PCA(n_components=100)  # Adjust n_components as needed
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
time_steps = 10  # Adjust the time steps as per your requirement
n_features = X_train.shape[1] // time_steps
X_train = X_train.reshape((X_train.shape[0], time_steps, n_features))
X_test = X_test.reshape((X_test.shape[0], time_steps, n_features))

# Build the LSTM model
model = Sequential()
model.add(LSTM(128, input_shape=(time_steps, n_features), return_sequences=True))
model.add(Dropout(0.5))
model.add(LSTM(128, return_sequences=False))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Early stopping and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_lstm_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stopping, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

# Plot the learning curves
plt.figure(figsize=(12, 4))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Bidirectional, BatchNormalization, GRU
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

# Assuming X and Y are already defined
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Dimensionality reduction with PCA
# pca = PCA(n_components=1000)  # Adjust n_components as needed
# X_train = pca.fit_transform(X_train)
# X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
time_steps = 5  # Adjust the time steps as per your requirement
n_features = X_train.shape[1] // time_steps
X_train = X_train.reshape((X_train.shape[0], time_steps, n_features))
X_test = X_test.reshape((X_test.shape[0], time_steps, n_features))

# Build the simplified Bidirectional GRU model with Batch Normalization
model = Sequential()
model.add(Bidirectional(GRU(32, return_sequences=True, kernel_regularizer=l2(0.01)), input_shape=(time_steps, n_features)))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Bidirectional(GRU(32, return_sequences=False, kernel_regularizer=l2(0.01))))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(16, activation='relu', kernel_regularizer=l2(0.01)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model with Adam optimizer and learning rate scheduler
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.0001),
              metrics=['accuracy'])

# Early stopping, learning rate reduction, and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)
checkpoint = ModelCheckpoint('best_simplified_gru_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2, callbacks=[early_stopping, reduce_lr, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

# Plot the learning curves
plt.figure(figsize=(12, 4))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Bidirectional, BatchNormalization, GRU
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from sklearn.utils import shuffle
import matplotlib.pyplot as plt

# Assuming X and Y are already defined
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Dimensionality reduction with PCA
# pca = PCA(n_components=1000)  # Adjust n_components as needed
# X_train = pca.fit_transform(X_train)
# X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
time_steps = 5  # Adjust the time steps as per your requirement
n_features = X_train.shape[1] // time_steps
X_train = X_train.reshape((X_train.shape[0], time_steps, n_features))
X_test = X_test.reshape((X_test.shape[0], time_steps, n_features))

# Build the simplified Bidirectional GRU model with Batch Normalization
model = Sequential()
model.add(Bidirectional(GRU(32, return_sequences=True, kernel_regularizer=l2(0.01)), input_shape=(time_steps, n_features)))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Bidirectional(GRU(32, return_sequences=False, kernel_regularizer=l2(0.01))))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Dense(16, activation='relu', kernel_regularizer=l2(0.01)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model with Adam optimizer and learning rate scheduler
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.0001),
              metrics=['accuracy'])

# Early stopping, learning rate reduction, and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)
checkpoint = ModelCheckpoint('best_simplified_gru_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=200, batch_size=32, validation_split=0.2, callbacks=[early_stopping, reduce_lr, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

# Plot the learning curves
plt.figure(figsize=(12, 4))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.utils import shuffle

# Assuming new_X and Y are already defined

# Shuffle your dataset to ensure varied model training
X, Y = shuffle(X, Y, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Optional: PCA for dimensionality reduction
# pca = PCA(n_components=20)
# X_train = pca.fit_transform(X_train)
# X_test = pca.transform(X_test)

# Reshape data for LSTM [samples, time steps, features]
# Assuming the data has a time step dimension. If not, you need to introduce it.
time_steps = 1  # You can adjust the time steps as per your requirement
X_train = X_train.reshape((X_train.shape[0], time_steps, X_train.shape[1]))
X_test = X_test.reshape((X_test.shape[0], time_steps, X_test.shape[1]))

# Build the LSTM model
model = Sequential()
model.add(LSTM(128, input_shape=(time_steps, X_train.shape[2]), return_sequences=True))
model.add(Dropout(0.5))
model.add(LSTM(128, return_sequences=False))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Early stopping and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_lstm_model.h5', save_best_only=True)

# Train the Model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stopping, checkpoint])

# Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')


In [None]:
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.utils import shuffle

# Shuffle your dataset to ensure varied model training
X_train, y_train = shuffle(X_train, y_train, random_state=1234)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(new_X, Y, test_size=0.2, random_state=1234)

# Shuffle your dataset to ensure varied model training
X_train, y_train = shuffle(X_train, y_train, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Optional: PCA for dimensionality reduction
pca = PCA(n_components=20)
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# Step 2: Build the Neural Network
model = Sequential()
model.add(Dense(128, input_dim=X_train.shape[1], activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)))
model.add(Dropout(0.5))
model.add(Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Early stopping and model checkpointing
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True)

# Step 3: Train the Model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[early_stopping, checkpoint])

# Step 4: Evaluate the Model
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {accuracy}')

# Step 5: Make Predictions
y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

Added fourier transform

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from scipy.fft import fft
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

# Function to compute Fourier Transform and extract phase features
def compute_fourier_features(data, sampling_rate):
    """
    Compute Fourier Transform and extract phase features.

    Parameters:
    - data: Input data (2D array where rows are samples and columns are features)
    - sampling_rate: Sampling rate of the data

    Returns:
    - phase_features: Extracted phase features
    """
    num_samples, num_features = data.shape
    phase_features = np.zeros_like(data, dtype=np.float32)

    for i in range(num_samples):
        # Perform Fourier Transform
        yf = fft(data[i])

        # Extract phase information
        phase = np.angle(yf)

        # Use phase information as features
        phase_features[i] = phase

    return phase_features

# Example data preparation (replace with your actual data)
# X, Y = your_data_loading_function()

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(new_X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Optional: PCA for dimensionality reduction (if needed)
from sklearn.decomposition import PCA
pca = PCA(n_components=20)
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)

# Compute Fourier features
sampling_rate = 1  # Adjust according to your data
phase_features_train = compute_fourier_features(X_train, sampling_rate)
phase_features_test = compute_fourier_features(X_test, sampling_rate)

# Standardize phase features
scaler_phase = StandardScaler()
phase_features_train = scaler_phase.fit_transform(phase_features_train)
phase_features_test = scaler_phase.transform(phase_features_test)

# Combine original and phase features
X_train_combined = np.hstack((X_train, phase_features_train))
X_test_combined = np.hstack((X_test, phase_features_test))

# Build the Neural Network
model = Sequential()
model.add(Dense(128, input_dim=X_train_combined.shape[1], activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Train the Model
history = model.fit(X_train_combined, y_train, epochs=50, batch_size=32, validation_split=0.2)

# Evaluate the Model
loss, accuracy = model.evaluate(X_test_combined, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test_combined)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

NameError: name 'new_X' is not defined

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from scipy.fft import fft
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

# Function to compute Fourier Transform and extract magnitude and phase features
def compute_fourier_features(data, sampling_rate):
    """
    Compute Fourier Transform and extract magnitude and phase features.

    Parameters:
    - data: Input data (2D array where rows are samples and columns are features)
    - sampling_rate: Sampling rate of the data

    Returns:
    - magnitude_features: Extracted magnitude features
    - phase_features: Extracted phase features
    """
    num_samples, num_features = data.shape
    magnitude_features = np.zeros_like(data, dtype=np.float32)
    phase_features = np.zeros_like(data, dtype=np.float32)

    for i in range(num_samples):
        # Perform Fourier Transform
        yf = fft(data[i])

        # Extract magnitude and phase information
        magnitude = np.abs(yf)
        phase = np.angle(yf)

        # Use magnitude and phase information as features
        magnitude_features[i] = magnitude
        phase_features[i] = phase

    return magnitude_features, phase_features
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(new_X, Y, test_size=0.2, random_state=1234)

# Feature scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Compute Fourier features
sampling_rate = 1  # Adjust according to your data
magnitude_features_train, phase_features_train = compute_fourier_features(X_train, sampling_rate)
magnitude_features_test, phase_features_test = compute_fourier_features(X_test, sampling_rate)

# Standardize magnitude and phase features
scaler_magnitude = StandardScaler()
magnitude_features_train = scaler_magnitude.fit_transform(magnitude_features_train)
magnitude_features_test = scaler_magnitude.transform(magnitude_features_test)

scaler_phase = StandardScaler()
phase_features_train = scaler_phase.fit_transform(phase_features_train)
phase_features_test = scaler_phase.transform(phase_features_test)

# Combine original, magnitude, and phase features
X_train_combined = np.hstack((X_train, magnitude_features_train, phase_features_train))
X_test_combined = np.hstack((X_test, magnitude_features_test, phase_features_test))

# Build the Neural Network
model = Sequential()
model.add(Dense(128, input_dim=X_train_combined.shape[1], activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(loss='binary_crossentropy',
              optimizer=Adam(learning_rate=0.001),
              metrics=['accuracy'])

# Train the Model
history = model.fit(X_train_combined, y_train, epochs=50, batch_size=32, validation_split=0.2)

# Evaluate the Model
loss, accuracy = model.evaluate(X_test_combined, y_test)
print(f'Test Accuracy: {accuracy}')

# Make Predictions
y_pred = model.predict(X_test_combined)
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calculate accuracy
final_accuracy = accuracy_score(y_test, y_pred_classes)
print(f'Final Test Accuracy: {final_accuracy}')

NameError: name 'new_X' is not defined

#1

In [None]:
import numpy as np

def split_classes(data, stim_id_1, stim_id_2):
    """
    Parameters:
    - data: Dictionary containing the data.
    - stim_id_1: Stimulus ID for class 1.
    - stim_id_2: Stimulus ID for class 2.

    Returns:
    - class_1_data: Data dictionary for class 1.
    - class_2_data: Data dictionary for class 2.
    """
    # Initialize dictionaries for the two classes
    class_1_data = {'V': [], 't_on': [], 't_off': [], 'stim_id': []}
    class_2_data = {'V': [], 't_on': [], 't_off': [], 'stim_id': []}

    # Iterate through the stimulus IDs and split the data
    for i, stim_id in enumerate(data['stim_id']):
        t_on = data['t_on'][i]
        t_off = data['t_off'][i]
        if stim_id == stim_id_1:
            class_1_data['V'].append(data['V'][t_on:t_off])
            class_1_data['t_on'].append(t_on)
            class_1_data['t_off'].append(t_off)
            class_1_data['stim_id'].append(stim_id)
        elif stim_id == stim_id_2:
            class_2_data['V'].append(data['V'][t_on:t_off])
            class_2_data['t_on'].append(t_on)
            class_2_data['t_off'].append(t_off)
            class_2_data['stim_id'].append(stim_id)

    # Convert lists to numpy arrays
    class_1_data['V'] = np.array(class_1_data['V'])
    class_1_data['t_on'] = np.array(class_1_data['t_on'])
    class_1_data['t_off'] = np.array(class_1_data['t_off'])
    class_1_data['stim_id'] = np.array(class_1_data['stim_id'])

    class_2_data['V'] = np.array(class_2_data['V'])
    class_2_data['t_on'] = np.array(class_2_data['t_on'])
    class_2_data['t_off'] = np.array(class_2_data['t_off'])
    class_2_data['stim_id'] = np.array(class_2_data['stim_id'])

    return class_1_data, class_2_data

stim_id_1 = 11
stim_id_2 = 12

class_1_data, class_2_data = split_classes(dat1, stim_id_1, stim_id_2)

print("Class 1 Data Keys:", class_1_data.keys())
print("Class 2 Data Keys:", class_2_data.keys())
print("Class 1 V Shape:", class_1_data['V'].shape)
print("Class 2 V Shape:", class_2_data['V'].shape)

# Dataset info #

This is one of multiple ECoG datasets from Miller 2019, recorded in a clinical settings with a variety of tasks. Raw data and dataset paper are here:

https://exhibits.stanford.edu/data/catalog/zk881ps0522
https://www.nature.com/articles/s41562-019-0678-3

This particular dataset was originally described in this paper:

- Miller, Kai J., Gerwin Schalk, Eberhard E. Fetz, Marcel Den Nijs, Jeffrey G. Ojemann, and Rajesh PN Rao. "Cortical activity during motor execution, motor imagery, and imagery-based online feedback." Proceedings of the National Academy of Sciences (2010): 200913697. doi: [10.1073/pnas.0913697107](https://doi.org/10.1073/pnas.0913697107)

<br>

`dat1` and `dat2` are data from the two blocks performed in each subject. The first one was the actual movements, the second one was motor imagery. For the movement task, from the original dataset instructions:

*Patients performed simple, repetitive, motor tasks of hand (synchronous flexion and extension of all fingers, i.e., clenching and releasing a fist at a self-paced rate of ~1-2 Hz) or tongue (opening of mouth with protrusion and retraction of the tongue, i.e., sticking the tongue in and out, also at ~1-2 Hz). These movements were performed in an interval-based manner, alternating between movement and rest, and the side of move- ment was always contralateral to the side of cortical grid placement.*

<br>

For the imagery task, from the original dataset instructions:

*Following the overt movement experiment, each subject performed an imagery task, imagining making identical movement rather than executing the movement. The imagery was kinesthetic rather than visual (“imagine yourself performing the actions like you just did”; i.e., “don’t imagine what it looked like, but imagine making the motions”).*

<br>

Sample rate is always 1000Hz, and the ECoG data has been notch-filtered at 60, 120, 180, 240 and 250Hz, followed by z-scoring across time and conversion to float16 to minimize size. Please convert back to float32 after loading the data in the notebook, to avoid unexpected behavior.

Both experiments:
* `dat['V']`: continuous voltage data (time by channels)
* `dat['srate']`: acquisition rate (1000 Hz). All stimulus times are in units of this.  
* `dat['t_on']`: time of stimulus onset in data samples
* `dat['t_off']`: time of stimulus offset, always 400 samples after `t_on`
* `dat['stim_id`]: identity of stimulus (11 = tongue, 12 = hand), real or imaginary stimulus
* `dat['scale_uv']`: scale factor to multiply the data values to get to microvolts (uV).
* `dat['locs`]`: 3D electrode positions on the brain surface

In [None]:
from nilearn import plotting
from nimare import utils

plt.figure(figsize=(8, 8))
locs = dat1['locs']
view = plotting.view_markers(utils.tal2mni(locs),
                             marker_labels=['%d'%k for k in np.arange(locs.shape[0])],
                             marker_color='purple',
                             marker_size=5)
view

In [None]:
# quick way to get broadband power in time-varying windows
from scipy import signal

# pick subject 0 and experiment 0 (real movements)
dat1 = alldat[0][0]

# V is the voltage data
V = dat1['V'].astype('float32')

# high-pass filter above 50 Hz
b, a = signal.butter(3, [50], btype='high', fs=1000)
V = signal.filtfilt(b, a, V, 0)

# compute smooth envelope of this signal = approx power
V = np.abs(V)**2
b, a = signal.butter(3, [10], btype='low', fs=1000)
V = signal.filtfilt(b, a, V, 0)

# normalize each channel so its mean power is 1
V = V/V.mean(0)

In [None]:
# average the broadband power across all tongue and hand trials
nt, nchan = V.shape
nstim = len(dat1['t_on'])

trange = np.arange(0, 2000)
ts = dat1['t_on'][:, np.newaxis] + trange
V_epochs = np.reshape(V[ts, :], (nstim, 2000, nchan))

V_tongue = (V_epochs[dat1['stim_id'] == 11]).mean(0)
V_hand = (V_epochs[dat1['stim_id'] == 12]).mean(0)

In [None]:
# let's find the electrodes that distinguish tongue from hand movements
# note the behaviors happen some time after the visual cue

plt.figure(figsize=(20, 10))
for j in range(46):
  ax = plt.subplot(5, 10, j+1)
  plt.plot(trange, V_tongue[:, j])
  plt.plot(trange, V_hand[:, j])
  plt.title('ch%d'%j)
  plt.xticks([0, 1000, 2000])
  plt.ylim([0, 4])
plt.show()

In [None]:
# let's look at all the trials for electrode 20 that has a good response to hand movements
# we will sort trials by stimulus id
plt.subplot(1, 3, 1)
isort = np.argsort(dat1['stim_id'])
plt.imshow(V_epochs[isort, :, 20].astype('float32'),
           aspect='auto',
           vmax=7, vmin=0,
           cmap='magma')
plt.colorbar()
plt.show()

In [None]:
# Electrode 42 seems to respond to tongue movements
isort = np.argsort(dat1['stim_id'])
plt.subplot(1, 3, 1)
plt.imshow(V_epochs[isort, :, 42].astype('float32'),
           aspect='auto',
           vmax=7, vmin=0,
           cmap='magma')
plt.colorbar()
plt.show()