My code was adapted from Dr. Xiang Zhang (xiang_zhang@hms.harvard.edu), Prof. Lina Yao (lina.yao@unsw.edu.au)
Citations for some of their materials can be provided here
article{zhang2020survey,
  title={A survey on deep learning-based non-invasive brain signals: recent advances and new frontiers},
  author={Zhang, Xiang and Yao, Lina and Wang, Xianzhi and Monaghan, Jessica JM and Mcalpine, David and Zhang, Yu},
  journal={Journal of Neural Engineering},
  year={2020},
  publisher={IOP Publishing}
}

@book{zhang2021deep,
  title={Deep Learning for EEG-based Brain-Computer Interface: Representations, Algorithms and Applications},
  author={Zhang, Xiang and Yao, Lina},
  year={2021},
  publisher={World Scientific Publishing}
}

In [1]:
!git clone https://github.com//xiangzhang1015/Deep-Learning-for-BCI.git
%cd Deep-Learning-for-BCI/dataset
!ls

!mkdir -p unzipped_data
!unzip "*.zip" -d /content/Deep-Learning-for-BCI/dataset/unzipped_data


!ls /content/Deep-Learning-for-BCI/dataset/unzipped_data


Cloning into 'Deep-Learning-for-BCI'...
remote: Enumerating objects: 448, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 448 (delta 0), reused 1 (delta 0), pack-reused 445 (from 1)[K
Receiving objects: 100% (448/448), 2.14 GiB | 25.37 MiB/s, done.
Resolving deltas: 100% (188/188), done.
Updating files: 100% (137/137), done.
/content/Deep-Learning-for-BCI/dataset
100.zip  10.zip  1.zip	 29.zip  38.zip  47.zip  56.zip  65.zip  74.zip  83.zip  92.zip
101.zip  11.zip  20.zip  2.zip	 39.zip  48.zip  57.zip  66.zip  75.zip  84.zip  93.zip
102.zip  12.zip  21.zip  30.zip  3.zip	 49.zip  58.zip  67.zip  76.zip  85.zip  94.zip
103.zip  13.zip  22.zip  31.zip  40.zip  4.zip	 59.zip  68.zip  77.zip  86.zip  95.zip
104.zip  14.zip  23.zip  32.zip  41.zip  50.zip  5.zip	 69.zip  78.zip  87.zip  96.zip
105.zip  15.zip  24.zip  33.zip  42.zip  51.zip  60.zip  6.zip	 79.zip  88.zip  97.zip
106.zip  16.zip  25.zip  34.zip  43.zip

**LSTM**

Long short-term memory (LSTM) an RNN architecture which has an input gate, an output gate and a forget gate; the cell remembers values over arbitrary time intervals and the three gates regulate the flow of information into and out of the cell. This code aims to extract time-series information hidden from EEG signals. My goal was to just play around with making an RNN since a CNN might take too much time and this code was SO well documented I kind of got to annotate and run step by step, then I got to debug, then (time-depending) I will make tweaks to the code. The goal of this was just to be an exploration.

Adapted from: Dr. Xiang Zhang (xiang_zhang@hms.harvard.edu), Prof. Lina Yao (lina.yao@unsw.edu.au) at https://github.com/xiangzhang1015/Deep-Learning-for-BCI.

In [None]:
#what the OG team did to load their data
#  dataset_1 = np.load('1.npy')
# print('dataset_1 shape:', dataset_1.shape)


In [None]:
#import libraries and load the dataset from github via cloning (I actually have never done this!)
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import time
import torch
import torch.nn as nn
import torch.utils.data as Data
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score,accuracy_score,classification_report
import os

# Path to the directory containing .npy files
data_path = '/content/Deep-Learning-for-BCI/dataset/unzipped_data'

# List all .npy files in the directory
file_list = [file for file in os.listdir(data_path) if file.endswith('.npy')]

# Load all .npy files into a list
datasets = []
for file_name in file_list:
    file_path = os.path.join(data_path, file_name)
    data = np.load(file_path)
    datasets.append(data)
    print(f'{file_name} shape: {data.shape}')

print(f"Total number of files loaded: {len(datasets)}")

95.npy shape: (259520, 65)
50.npy shape: (255680, 65)
11.npy shape: (255680, 65)
30.npy shape: (257600, 65)
85.npy shape: (255680, 65)
39.npy shape: (255680, 65)
69.npy shape: (255520, 65)
96.npy shape: (259520, 65)
67.npy shape: (255680, 65)
78.npy shape: (255680, 65)
102.npy shape: (255840, 65)
82.npy shape: (255680, 65)
71.npy shape: (259520, 65)
9.npy shape: (255680, 65)
99.npy shape: (255680, 65)
88.npy shape: (209984, 65)
81.npy shape: (255680, 65)
56.npy shape: (255680, 65)
40.npy shape: (255680, 65)
49.npy shape: (255680, 65)
79.npy shape: (259520, 65)
18.npy shape: (255680, 65)
61.npy shape: (259520, 65)
97.npy shape: (255520, 65)
105.npy shape: (255680, 65)
62.npy shape: (255680, 65)
36.npy shape: (255680, 65)
16.npy shape: (255680, 65)
7.npy shape: (259520, 65)
46.npy shape: (259520, 65)
33.npy shape: (255680, 65)
15.npy shape: (255680, 65)
77.npy shape: (255680, 65)
1.npy shape: (259520, 65)
60.npy shape: (255680, 65)
3.npy shape: (259520, 65)
26.npy shape: (255680, 65)
24.

In [2]:
# !pip install torch torchvision torchaudio



In [7]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# Custom Dataset Class to Load Multiple .npy Files
class NPYDataset(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        # List all .npy files in the directory
        self.file_list = [file for file in os.listdir(data_path) if file.endswith('.npy')]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        # Load each .npy file dynamically
        file_name = self.file_list[index]
        file_path = os.path.join(self.data_path, file_name)
        data = np.load(file_path)  # Load numpy array

        # Convert numpy array to PyTorch tensor
        x_data = torch.tensor(data, dtype=torch.float32)

        # Dummy label (replace this with actual labels if available)
        y_label = torch.tensor(index % 2, dtype=torch.long)  # Example: Class 0 or 1

        return x_data, y_label

# Define a custom collate function to pad sequences
def collate_fn(batch):
    # Separate inputs and labels
    x_batch, y_batch = zip(*batch)

    # Pad the input sequences
    x_batch_padded = pad_sequence(x_batch, batch_first=True, padding_value=0)

    # Stack the labels
    y_batch = torch.stack(y_batch, dim=0)

    return x_batch_padded, y_batch

# Path where your .npy files are stored
data_path = '/content/Deep-Learning-for-BCI/dataset/unzipped_data'

# Create an instance of the custom Dataset
dataset = NPYDataset(data_path)

# Use DataLoader to batch the data for training or testing
# Use the custom collate_fn
data_loader = DataLoader(dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

# Iterate through the DataLoader to load batches of data
for x_batch, y_batch in data_loader:
    print("Batch X shape:", x_batch.shape)  # Shape of input data
    print("Batch Y shape:", y_batch.shape)  # Shape of labels
    break  # Print the first batch and stop

Batch X shape: torch.Size([16, 259520, 65])
Batch Y shape: torch.Size([16])


In [10]:


dataset_1 = np.load(os.path.join(data_path, '1.npy'))
print('The shape of Dataset_1:', dataset_1.shape)
dataset_1

The shape of Dataset_1: (259520, 65)


array([[-16, -29,   2, ..., -11,  15,   0],
       [-56, -54, -27, ...,   1,  21,   0],
       [-55, -55, -29, ...,  18,  35,   0],
       ...,
       [  0,   0,   0, ...,   0,   0,   9],
       [  0,   0,   0, ...,   0,   0,   9],
       [  0,   0,   0, ...,   0,   0,   9]])

In [15]:
# check if a GPU is available - I don't need to do this on colab but it's fine
import torch
import torch.nn as nn
import torch.utils.data as Data
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score,accuracy_score,classification_report
from sklearn.model_selection import train_test_split
import os
from sklearn.preprocessing import StandardScaler
import time

with_gpu = torch.cuda.is_available()
if with_gpu:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print('We are using %s now.' %device)

# remove instance with label==10 (rest)
removed_label = [2,3,4,5,6,7,8,9,10]  #2,3,4,5,
for ll in removed_label:
    id = dataset_1[:, -1]!=ll
    dataset_1 = dataset_1[id]

# data segmentation
n_class = int(11-len(removed_label))  # 0~9 classes ('10:rest' is not considered)
no_feature = 64  # the number of the features
segment_length = 16  # selected time window; 16=160*0.1
LR = 0.005  # learning rate
EPOCH = 101
n_hidden = 128  # number of neurons in hidden layer
l2 = 0.001  # the coefficient of l2-norm regularization

def one_hot(y_):
    # Function to encode output labels from number indexes
    # e.g.: [[5], [0], [3]] --> [[0, 0, 0, 0, 0, 1], [1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]]
    y_ = y_.reshape(len(y_))
    y_ = [int(xx) for xx in y_]
    n_values = np.max(y_) + 1
    return np.eye(n_values)[np.array(y_, dtype=np.int32)]

def extract(input, n_classes, n_fea, time_window, moving):
    xx = input[:, :n_fea]
    yy = input[:, n_fea:n_fea + 1]
    new_x = []
    new_y = []
    number = int((xx.shape[0] / moving) - 1)
    for i in range(number):
        ave_y = np.average(yy[int(i * moving):int(i * moving + time_window)])
        if ave_y in range(n_classes + 1):
            new_x.append(xx[int(i * moving):int(i * moving + time_window), :])
            new_y.append(ave_y)
        else:
            new_x.append(xx[int(i * moving):int(i * moving + time_window), :])
            new_y.append(0)

    new_x = np.array(new_x)
    new_x = new_x.reshape([-1, n_fea * time_window])
    new_y = np.array(new_y)
    new_y.shape = [new_y.shape[0], 1]
    data = np.hstack((new_x, new_y))
    data = np.vstack((data, data[-1]))  # add the last sample again, to make the sample number round
    return data

data_seg = extract(dataset_1, n_classes=n_class, n_fea=no_feature, time_window=segment_length, moving=(segment_length/2))  # 50% overlapping
print('After segmentation, the shape of the data:', data_seg.shape)

# split training and test data
no_longfeature = no_feature*segment_length
data_seg_feature = data_seg[:, :no_longfeature]
data_seg_label = data_seg[:, no_longfeature:no_longfeature+1]
train_feature, test_feature, train_label, test_label = train_test_split(data_seg_feature, data_seg_label,test_size=0.2, shuffle=True)

# normalization
# before normalize reshape data back to raw data shape
train_feature_2d = train_feature.reshape([-1, no_feature])
test_feature_2d = test_feature.reshape([-1, no_feature])

scaler1 = StandardScaler().fit(train_feature_2d)
train_fea_norm1 = scaler1.transform(train_feature_2d) # normalize the training data
test_fea_norm1 = scaler1.transform(test_feature_2d) # normalize the test data
print('After normalization, the shape of training feature:', train_fea_norm1.shape,
      '\nAfter normalization, the shape of test feature:', test_fea_norm1.shape)

# after normalization, reshape data to 3d in order to feed in to LSTM
train_fea_norm1 = train_fea_norm1.reshape([-1, segment_length, no_feature])
test_fea_norm1 = test_fea_norm1.reshape([-1, segment_length, no_feature])
print('After reshape, the shape of training feature:', train_fea_norm1.shape,
      '\nAfter reshape, the shape of test feature:', test_fea_norm1.shape)

BATCH_size = test_fea_norm1.shape[0] # use test_data as batch size

# feed data into dataloader
train_fea_norm1 = torch.tensor(train_fea_norm1).to(device)
train_label = torch.tensor(train_label.flatten()).to(device)
train_data = Data.TensorDataset(train_fea_norm1, train_label)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_size, shuffle=False)

test_fea_norm1 = torch.tensor(test_fea_norm1).to(device)
test_label = torch.tensor(test_label.flatten()).to(device)

# classifier
class LSTM(nn.Module):
    def __init__(self):
        super(LSTM, self).__init__()

        self.lstm_layer = nn.LSTM(
            input_size=no_feature,
            hidden_size=n_hidden,         # LSTM hidden unit
            num_layers=2,           # number of LSTM layer
            bias=True,
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, segment_length, no_feature)
        )

        self.out = nn.Linear(n_hidden, n_class)

    def forward(self, x):
        r_out, (h_n, h_c) = self.lstm_layer(x.float(), None)
        r_out = F.dropout(r_out, 0.3)

        test_output = self.out(r_out[:, -1, :]) # choose r_out at the last time step
        return test_output

lstm = LSTM()
lstm.to(device)
print(lstm)

optimizer = torch.optim.Adam(lstm.parameters(), lr=LR, weight_decay=l2)   # optimize all parameters
loss_func = nn.CrossEntropyLoss()

best_acc = []
best_auc = []

# training and testing
start_time = time.perf_counter()
for epoch in range(EPOCH):
    for step, (train_x, train_y) in enumerate(train_loader):

        output = lstm(train_x)  # LSTM output of training data
        loss = loss_func(output, train_y.long())  # cross entropy loss
        optimizer.zero_grad()  # clear gradients for this training step
        loss.backward()  # backpropagation, compute gradients
        optimizer.step()  # apply gradients

    if epoch % 10 == 0:
        test_output = lstm(test_fea_norm1)  # LSTM output of test data
        test_loss = loss_func(test_output, test_label.long())

        test_y_score = one_hot(test_label.data.cpu().numpy())  # .cpu() can be removed if your device is cpu.
        pred_score = F.softmax(test_output, dim=1).data.cpu().numpy()  # normalize the output
        auc_score = roc_auc_score(test_y_score, pred_score)

        pred_y = torch.max(test_output, 1)[1].data.cpu().numpy()
        pred_train = torch.max(output, 1)[1].data.cpu().numpy()

        test_acc = accuracy_score(test_label.data.cpu().numpy(), pred_y)
        train_acc = accuracy_score(train_y.data.cpu().numpy(), pred_train)


        print('Epoch: ', epoch, '|train loss: %.4f' % loss.item(),
              ' train ACC: %.4f' % train_acc, '| test loss: %.4f' % test_loss.item(),
              'test ACC: %.4f' % test_acc, '| AUC: %.4f' % auc_score)
        best_acc.append(test_acc)
        best_auc.append(auc_score)

current_time = time.perf_counter()
running_time = current_time - start_time
print(classification_report(test_label.data.cpu().numpy(), pred_y))
print('BEST TEST ACC: {}, AUC: {}'.format(max(best_acc), max(best_auc)))
print("Total Running Time: {} seconds".format(round(running_time, 2)))

We are using cpu now.
After segmentation, the shape of the data: (2440, 1025)
After normalization, the shape of training feature: (31232, 64) 
After normalization, the shape of test feature: (7808, 64)
After reshape, the shape of training feature: (1952, 16, 64) 
After reshape, the shape of test feature: (488, 16, 64)
LSTM(
  (lstm_layer): LSTM(64, 128, num_layers=2, batch_first=True)
  (out): Linear(in_features=128, out_features=2, bias=True)
)
Epoch:  0 |train loss: 0.6795  train ACC: 0.5963 | test loss: 0.6819 test ACC: 0.5635 | AUC: 0.5955
Epoch:  10 |train loss: 0.1324  train ACC: 0.9508 | test loss: 0.1596 test ACC: 0.9426 | AUC: 0.9821
Epoch:  20 |train loss: 0.0619  train ACC: 0.9836 | test loss: 0.1583 test ACC: 0.9570 | AUC: 0.9843
Epoch:  30 |train loss: 0.1438  train ACC: 0.9488 | test loss: 0.1818 test ACC: 0.9201 | AUC: 0.9847
Epoch:  40 |train loss: 0.0179  train ACC: 0.9939 | test loss: 0.1115 test ACC: 0.9590 | AUC: 0.9942
Epoch:  50 |train loss: 0.0095  train ACC: 0.9