In [1]:
import glob #For getting mat files from our directory

import torch #For building our model
import torch.nn as nn #For building our model
import scipy.io as sio #For loading mat files on this case
import mne #For EEG data handling
import numpy as np #For arrays and mathematical functions on arrays
import sklearn #To train our model
import torchmetrics #To assess model accuracy
from sklearn.model_selection import GroupKFold #Gets GroupKFold Algorithm / Iterator
from sklearn.preprocessing import StandardScaler #Gets StandardScalar to standardize our dataset
from sklearn.base import TransformerMixin,BaseEstimator #To create our StandardScalar3D class
from pytorch_lightning import LightningModule,Trainer #LightningModule to organize our Pytorch code for Traininer, to train our model.
from torch.utils.data import TensorDataset,DataLoader #To create a dataset from Tensors (our features and labels); To load our data in batches and iterate over it.

In [2]:
input=torch.randn(3, 14, 512) #3 is the "depth" of the 3d Tensor, 22 is the "rows / height" of the 3D Tensor, and 15000 are the "columns / width" of the 3D Tensor
input.shape

torch.Size([3, 14, 512])

In [3]:
class InceptionBlock(nn.Module): # Creating the class for our inception block
    def __init__(self, in_channels):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels = 32, kernel_size = 2, stride = 2, padding = 0) # Defines 1st Convolution
        self.conv2 = nn.Conv1d(in_channels, out_channels = 32, kernel_size = 4, stride = 2, padding = 1) # Defines 2nd Convolution
        self.conv3 = nn.Conv1d(in_channels, out_channels = 32, kernel_size = 8, stride = 2, padding = 3) # Defines 3rd Convolution
        self.relu=nn.ReLU() # Defines the ReLU Activation Function

    #Here, xn is the output of the nth layer.
    def forward(self,x): #Defining the forward function
        x1 = self.relu(self.conv1(x)) #performing 1st conv and outputting x1
        x2 = self.relu(self.conv2(x)) #performing 2nd conv and outputting x2
        x3 = self.relu(self.conv3(x)) #performing 3rd conv and outputting x3
        x = torch.cat((x1,x2,x3), dim = 1) #taking all outputs of convolutions and concatenating them on 1 Dimension
        return x

In [4]:
class ChronoNet(nn.Module):
    def __init__(self, channel):
        super().__init__()
        self.inception_block1=InceptionBlock(channel) # 1st Inception Block
        self.inception_block2=InceptionBlock(96) # 2nd Inception Block
        self.inception_block3=InceptionBlock(96) # 3rd Inception Block
        self.gru1 = nn.GRU(input_size = 96, hidden_size = 32, batch_first = True) # 1st GRU layer
        self.gru2 = nn.GRU(input_size = 32, hidden_size = 32, batch_first = True) # 2nd GRU layer
        self.gru3 = nn.GRU(input_size = 64, hidden_size = 32, batch_first = True) # 3rd GRU layer
        self.gru4 = nn.GRU(input_size = 96, hidden_size = 32, batch_first = True) # 4th GRU layer
        self.relu = nn.ReLU() # ReLU Activation Function
        self.gru_linear=nn.Linear(in_features = 64, out_features = 1) # Linear Layer for the 4th GRU
        self.flatten = nn.Flatten() # Flattening Layer
        self.fc1 = nn.Linear(32,1) # Fully Connected Layer / Output Layer.

    def forward(self,x): # Defining the feed forward function
        x=self.inception_block1(x) # Fed to Inception Block 1
        x=self.inception_block2(x) # Fed to Inception Block 2
        x=self.inception_block3(x) # Fed to Inception Block 3
        x=x.permute(0,2,1) # Permuted for GRU layers
        gru_out1,_=self.gru1(x) # Fed into GRU layer 1
        gru_out2,_=self.gru2(gru_out1) # Fed into GRU layer 2
        gru_out=torch.cat((gru_out1, gru_out2), dim = 2) # Concatenated, defining the skip connection
        gru_out3,_=self.gru3(gru_out)  # Fed into GRU layer 3
        gru_out = torch.cat((gru_out1, gru_out2, gru_out3), dim = 2) #C Concatenated, defining the next 2 skip connections
        gru_out = gru_out.permute(0,2,1) # Permuted for the linear layer
        linear_out=self.relu(self.gru_linear(gru_out)) # Fed into the linear layer to reduce dimensionality
        linear_out = linear_out.permute(0,2,1) # Permuted for the 4th GRU layer
        gru_out4,_=self.gru4(linear_out) # Fed into the 4th GRU Layer
        x=self.flatten(gru_out4) # Data is Flattened for Fully Connected Layer
        x=self.fc1(x) # Fed into the Fully Connected Layer
        return x # Output

In [5]:
model = ChronoNet(14)
out = model(input)
out.data

tensor([[0.0270],
        [0.0246],
        [0.0260]])

Now we get our data from our directory

In [6]:
IDD = 'Data/Data/CleanData/CleanData_IDD/Rest'
TDC = 'Data/Data/CleanData/CleanData_TDC/Rest'

And we define our function for converting our Matlab data for a readable MNE format

In [7]:
def MatMNE(data):
    ch_names = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4'] #Defining Channel Names
    sampling_freq=128
    info = mne.create_info(ch_names, sfreq=sampling_freq, ch_types='eeg') #Creating an MNE Info Object
    info.set_montage('standard_1020') #Setting our EEG montage
    data = mne.io.RawArray(data, info) #Creating a RawArray to make our data readable for MNE
    data.filter(l_freq=1, h_freq=30) #Defining a bandpass filter
    data.set_eeg_reference() #Setting our reference EEG by taking the average of all channels
    epochs = mne.make_fixed_length_epochs(data, duration = 4) #Creating epochs
    return epochs.get_data()

In [8]:
%%capture

idd_subject=[] #Creating a list for IDD subject data

for idd in glob.glob(IDD+'/*.mat'): #For each .mat file, read it as idd
    data = sio.loadmat(idd)['clean_data'] #for each .mat file, idd, get data under the clean_data key in a NumPy array.
    data = MatMNE(data) #Passing our data through our MatMNE function and reassigning to 'data'.
    idd_subject.append(data) #Appending the patient data into a singular array.

In [9]:
%%capture

tdc_subject=[] #Creating a list for IDD subject data

for tdc in glob.glob(TDC+'/*.mat'): #For each .mat file, read it as idd
    data = sio.loadmat(tdc)['clean_data'] #for each .mat file, idd, get data under the clean_data key in a NumPy array.
    data = MatMNE(data) #Passing our data through our MatMNE function and reassigning to 'data'.
    tdc_subject.append(data) #Appending the patient data into a singular array.

In [10]:
control_epochs_labels=[len(i)*[0] for i in tdc_subject] #Labels our data for TDC as '0'
patients_epochs_labels=[len(i)*[1] for i in idd_subject] #Labels our data for IDD as '0'

In [11]:
print(len(control_epochs_labels), len(patients_epochs_labels))

7 7


In [12]:
data_list = tdc_subject + idd_subject #Adds the length of our 2 datasets to a singlular variable
label_list=control_epochs_labels + patients_epochs_labels #Adds the labels of our 2 datasets to a singular variable 
groups_list = [[i]*len(j) for i, j in enumerate(data_list)] #Indexes the test subjects / patients
print(len(data_list),len(label_list),len(groups_list))

14 14 14


In [13]:
gkf = sklearn.model_selection.GroupKFold()  # Assigning GroupKFold Function to gkf
class StandardScaler3D(BaseEstimator,TransformerMixin): #3D data shape of [Batch, Sequence, Channels]
    def __init__(self):
        self.scaler = StandardScaler()

    def fit(self,X,y=None):
            self.scaler.fit(X.reshape(X.shape[0], -1))
            return self
        
    def transform(self,X):
        return self.scaler.transform(X.reshape(X.shape[0], -1)).reshape(X.shape)

In [14]:
data_array= np.concatenate(data_list) 
label_array=np.concatenate(label_list)
group_array =np.concatenate(groups_list) 
data_array = np.moveaxis(data_array, 1, 2)

print(data_array.shape, label_array.shape, group_array.shape)

(420, 512, 14) (420,) (420,)


In [15]:
accuracy = [] #Creates a list to store our model accuracies accross iterations
for train_index, val_index in gkf.split(data_array, label_array, groups=group_array):
    # In GroupKFold cross-validation, get unique indices for training (train_index) and validation (val_index) sets.
    # The 'groups' variable ensures that there is no data leakage between different subjects/patients.
    train_features, train_labels = data_array[train_index], label_array[train_index]
    # Gets training features and labels based on the indices obtained at the kth split.
    val_features, val_labels = data_array[val_index], label_array[val_index]
    # Gets validation features and labels based on the indices obtained at the kth split.
    scaler = StandardScaler3D()  # Initializes a StandardScaler instance for feature scaling.
    train_features = scaler.fit_transform(train_features) #Fits the data and then transforms (standardizes) it
    val_features = scaler.transform(val_features) #Transforms (standardizes) data
    train_features = np.moveaxis(train_features,1,2) #Flip Axis to fit into ChronoNet Architecture
    val_features = np.moveaxis(val_features,1,2) #Flip Axis to fit into ChronoNet Architecture
    break

In [16]:
train_features = torch.Tensor(train_features)
val_features = torch.Tensor(val_features)
train_labels = torch.Tensor(train_labels)
val_labels = torch.Tensor(val_labels)

In [17]:
len(val_features),len(val_labels)

(90, 90)

In [18]:
len(train_features), len(train_labels)

(330, 330)

In [19]:
class ChronoModel(LightningModule):
    def __init__(self):
        super(ChronoModel,self).__init__()
        self.model=ChronoNet(14)
        self.lr=1e-3 ##Defining learning rate of our model. .0001 per step size
        self.bs=12 ## Defining the batch size
        self.worker=2 ## Defining the # of workers, a parallel process
        self.acc=torchmetrics.Accuracy(task='binary') ## For measuring accuracy of our model.
        self.criterion = nn.BCEWithLogitsLoss() ## For measuring accuracy of our model based on the final Sigmoid Activation function.
        self.train_outputs = [] #To store our outputs from training the model
        self.val_outputs = [] # To store our outputs from validating the model
        
        
    def forward(self,x): ## Defining forward function, for feeding our data into the model. 
        x=self.model(x)
        return x

    def configure_optimizers(self): ## Defining our optimizer
        return torch.optim.Adam(self.parameters(), lr=self.lr) #Implementing the Adam optimizer on our model.

    def train_dataloader(self): #Loads our training data
        dataset = TensorDataset(train_features,train_labels) #Creates a tensor data object from our Tensors representing our training features
        dataloader = DataLoader(dataset, batch_size=self.bs,num_workers = self.worker,shuffle=True) #Loads our data using the dataset, batch size, workers (parallel processes). Our data will be shuffled at each Epoch (per shuffle = True) to prevent overfitting.
        return dataloader

    def training_step(self,batch,batch_idx): #Defining our function for a single training step
        signal,label = batch # From each batch, we unpack signal and label data
        output=self(signal.float()) # Output of data given a signal
        loss=self.criterion(output.flatten(),label.float().flatten()) #Calculating loss using the output and BCEWithLogitsLoss function
        acc=self.acc(output.flatten(),label.long().flatten()) #Calculating the accuracy using the output and the Accuracy() function from torchmetrics
        self.train_outputs.append({'loss': loss, 'acc': acc}) # To append / add our output onto our training output list
        return {'loss':loss,'acc':acc} # Returns our model loss and accuracy

    def on_train_epoch_end(self):
        acc=torch.stack([x['acc'] for x in self.train_outputs]).mean().detach().cpu().numpy().round(2) # Takes the average accuracy for the outputs and stacks it onto a singular Tensor. Then detaches it from the gpu, converts it into a NumPy array, round it to the nearest hundreth, and passes it onto the cpu
        loss=torch.stack([x['loss'] for x in self.train_outputs]).mean().detach().cpu().numpy().round(2) # Takes the average loss for the outputs and stacks it onto a singular Tensor. Then detaches it from the gpu, converts it into a NumPy array, round it to the nearest hundreth, and passes it onto the cpu
        self.train_outputs.clear() #To free up memory after each Epoch
        print('train acc loss', acc,loss) # Printing our final training accuracy and loss
    
    def val_dataloader(self): #Loads our validation data
        dataset = TensorDataset(val_features,val_labels) #Creates a tensor data object from our Tensors representing our validation features
        dataloader = DataLoader(dataset, batch_size=self.bs,num_workers = self.worker,shuffle=True) #Loads our data using the dataset, batch size, workers (parallel processes). Our data will be shuffled at each Epoch (per shuffle = True) to prevent overfitting.
        return dataloader

    def validation_step(self,batch,batch_idx): #Defining our function for a single step
        signal,label = batch # From each batch, we unpack signal and label data
        output=self(signal.float()) # Output of data given a signal
        loss=self.criterion(output.flatten(),label.float().flatten()) #Calculating loss using the output and BCEWithLogitsLoss function
        acc=self.acc(output.flatten(),label.long().flatten()) #Calculating the accuracy using the output and the Accuracy() function from torchmetrics
        self.val_outputs.append({'loss': loss, 'acc': acc}) #To append / add our output onto our validation output list.
        return {'loss':loss,'acc':acc} # Returns our model loss and accuracy

    def on_validation_epoch_end(self):
        acc=torch.stack([x['acc'] for x in self.val_outputs]).mean().detach().cpu().numpy().round(2) # Takes the average accuracy for the outputs and stacks it onto a singular Tensor. Then detaches it from the gpu, converts it into a NumPy array, round it to the nearest hundreth, and passes it onto the cpu
        loss=torch.stack([x['loss'] for x in self.val_outputs]).mean().detach().cpu().numpy().round(2) # Takes the average loss for the outputs and stacks it onto a singular Tensor. Then detaches it from the gpu, converts it into a NumPy array, round it to the nearest hundreth, and passes it onto the cpu
        self.val_outputs.clear() # To free up memory after each epoch.
        print('val acc loss', acc, loss)


In [20]:
model=ChronoModel()

In [21]:
trainer=Trainer(max_epochs=1) # Setting up to finally train our model.

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/Users/juanvera/miniconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/logger_connector/logger_connector.py:67: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `pytorch_lightning` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default


In [22]:
trainer.fit(model) #Training our Model


  | Name      | Type              | Params
------------------------------------------------
0 | model     | ChronoNet         | 133 K 
1 | acc       | BinaryAccuracy    | 0     
2 | criterion | BCEWithLogitsLoss | 0     
------------------------------------------------
133 K     Trainable params
0         Non-trainable params
133 K     Total params
0.534     Total estimated model params size (MB)


Sanity Checking: |                                        | 0/? [00:00<?, ?it/s]

/Users/juanvera/miniconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:492: Your `val_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.
/Users/juanvera/miniconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:436: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.


val acc loss 0.29 0.71


/Users/juanvera/miniconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:436: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.
/Users/juanvera/miniconda3/lib/python3.11/site-packages/pytorch_lightning/loops/fit_loop.py:293: The number of training batches (28) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |                                               | 0/? [00:00<?, ?it/s]

Validation: |                                             | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


val acc loss 0.32 0.71
train acc loss 0.57 0.69
