In [3]:
!pip install pyedflib

Collecting pyedflib
  Downloading pyEDFlib-0.1.37-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pyedflib
Successfully installed pyedflib-0.1.37


In [4]:
import pandas as pd
import numpy as np
import os
import pyedflib
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from scipy import interpolate
from torch.utils.data import Dataset,DataLoader
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import gc
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [None]:
directory_path ="/content/eeg-during-mental-task-arithmetic"
def process_edf(file_path):
    f = pyedflib.EdfReader(file_path)

    num_signals = f.signals_in_file

    signal_labels = f.getSignalLabels()

    signals = []
    for i in range(num_signals):
        signals.append(f.readSignal(i))

    f.close()

    return np.array(signals), signal_labels


signals = []
signal_labels = []
file_name = []
for filename in os.listdir(directory_path):
    if filename.endswith(".edf"):  #Extension of eeg files
        file_name.append(filename)
        file_path = os.path.join(directory_path, filename)
        a ,b = process_edf(file_path)
        signals.append(a)
        signal_labels.append(b)

In [8]:
df = pd.DataFrame({
    'Signal': signals,
    'Name': file_name,
})
df.sort_values(by='Name', ascending=True, inplace=True)
df = df.reset_index().drop(columns=['index'])
df['State'] = df['Name'].str[10:11].astype(int)
df['Person'] = df['Name'].str[7:9].astype(int)
df = df.drop(columns = ['Name'])
df = pd.get_dummies(df , columns = ['State'] , drop_first = True)

In [9]:
avg = 0
for index, row in df.iterrows():
    avg = avg + row['Signal'].shape[1]
avg = avg / 72
avg

60250.0

In [10]:
print(f"Shape of each each tensor in state 1 {df['Signal'][0].shape}")
print(f"Shape of each each tensor in state 2 {df['Signal'][1].shape}")

Shape of each each tensor in state 1 (21, 91000)
Shape of each each tensor in state 2 (21, 31000)


In [11]:
def pad_or_truncate(signal):
    target_length = 60250
    if signal.shape[1] > target_length:
        return signal[:, :target_length]
    else:
        pad_width = target_length - signal.shape[1]
        return np.pad(signal, ((0, 0), (0, pad_width)), mode='constant')

df['Signal']=df['Signal'].apply(pad_or_truncate)
print(f"Shape of each each tensor in state 1 {df['Signal'][0].shape}")
print(f"Shape of each each tensor in state 2 {df['Signal'][1].shape}")

Shape of each each tensor in state 1 (21, 60250)
Shape of each each tensor in state 2 (21, 60250)


In [12]:
x_train, x_test, y_train, y_test = train_test_split(
    df['Signal'], df['State_2'], test_size=0.20, random_state=42,stratify = df['State_2'])
x_train = x_train.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)
x_test = x_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)


In [13]:
class CustomDataset(Dataset):
    def __init__(self, X_train, y_train, transform=None):
        self.X_train = X_train
        self.y_train = y_train
        self.transform = transform

    def __len__(self):
        return len(self.X_train)

    def __getitem__(self, idx):
        x = self.X_train[idx]
        y = self.y_train[idx]

        if self.transform:
            x = self.transform(x)

        x = torch.tensor(x, dtype=torch.float32).unsqueeze(0)
        y = torch.tensor(y, dtype=torch.float32)

        return x.to(device), y.to(device)

In [14]:
class EEGNet(nn.Module):
    def __init__(self):
        super(EEGNet, self).__init__()

        self.conv1 = nn.Conv2d(1, 16, (1, 1000), padding=0, stride =(3,1)) #Temporal filter
        self.batchnorm1 = nn.BatchNorm2d(16, False)
        self.padding1 = nn.ZeroPad2d((16, 17, 0, 1))

        self.conv2 = nn.Conv2d(16, 4, (2, 1000)) #Spatial Filter
        self.batchnorm2 = nn.BatchNorm2d(4, False)
        self.pooling2 = nn.MaxPool2d(2, 4)
        self.padding2 = nn.ZeroPad2d((2, 1, 4, 3))

        self.conv3 = nn.Conv2d(4, 4, (8, 500)) #Spatial Filter
        self.batchnorm3 = nn.BatchNorm2d(4, False)
        self.pooling3 = nn.MaxPool2d((2, 4))

        self.fc1 = nn.Linear(14072, 128)
        self.fc2 = nn.Linear(128, 1)


    def forward(self, x):

        x = F.elu(self.conv1(x))
        x = self.batchnorm1(x)
        x = self.padding1(x)

        x = F.elu(self.conv2(x))
        x = self.batchnorm2(x)
        x = self.pooling2(x)
        x = self.padding2(x)

        x = F.elu(self.conv3(x))
        x = self.batchnorm3(x)
        x = self.pooling3(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.sigmoid(self.fc2(x))

        return x


In [15]:
random = torch.randn(1,1,21,60250)
model = EEGNet()
model(random)

tensor([[0.5422]], grad_fn=<SigmoidBackward0>)

In [16]:
dataset = CustomDataset(x_train, y_train)
dataloader = DataLoader(dataset, batch_size=16, shuffle=False)

In [17]:
net = EEGNet().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(net.parameters(), lr=1e-4)

In [None]:
num_epochs = 5
loss_e = []

for epoch in range(num_epochs):
    running_loss = 0.0
    progress_bar = tqdm(dataloader, desc=f'Epoch {epoch+1}/{num_epochs}')

    for x, y in progress_bar:
        optimizer.zero_grad()
        outputs = net(x)
        loss = criterion(outputs, y.unsqueeze(1))
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        progress_bar.set_postfix(loss=running_loss / (progress_bar.n + 1))

        del x, y, outputs, loss
        gc.collect()
        torch.cuda.empty_cache()

    avg_epoch_loss = running_loss / len(dataloader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')
    loss_e.append(avg_epoch_loss)


In [None]:
def evaluate_model(model, dataloader):
    model.eval()
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs)
            predictions = (outputs >= 0.5).float()

            all_targets.extend(targets.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

    all_targets = np.array(all_targets)
    all_predictions = np.array(all_predictions)

    accuracy = accuracy_score(all_targets, all_predictions)
    precision = precision_score(all_targets, all_predictions)
    recall = recall_score(all_targets, all_predictions)
    f1 = f1_score(all_targets, all_predictions)

    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1 Score: {f1:.4f}')

In [None]:
test_dataset = CustomDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)
evaluate_model(net,test_loader)

In [None]:
evaluate_model(net,dataloader)

In [None]:
x = df['Signal']
y = df['State_2']
t_dataset = CustomDataset(x, y)
t_loader = DataLoader(t_dataset, batch_size=4, shuffle=False)
evaluate_model(net,t_loader)