In [1]:
GOOGLE = False
if GOOGLE:
    from google.colab import drive
    drive.mount('/content/drive')

In [3]:
from shared.data_loader import *
from shared.utils import *

In [5]:
# GPU code
import torch
import torch.nn.functional as F

# Define the device to use for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    print('Good to go!')
else:
    print('Please set GPU via Edit -> Notebook Settings.')

Please set GPU via Edit -> Notebook Settings.


In [4]:
# Transform code
import numpy as np
from scipy import signal

def scale_high_freq(input_signal, cutoff_frequency, factor):
    # Design a high-pass filter
    b, a = signal.butter(4, cutoff_frequency, 'high')

    # Apply the high-pass filter
    highpass_filtered_signal = signal.filtfilt(b, a, input_signal)

    # Scale the amplitude of the high-pass filtered signal
    scaled_highpass_signal = highpass_filtered_signal * factor

    # Combine the scaled high-pass signal with the original signal
    output_signal = input_signal - highpass_filtered_signal + scaled_highpass_signal

    return output_signal

In [6]:
# EEG DATA
import os
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torch
import pandas as pd

class EEG_Data(Dataset):

    def __init__(self, root_dir, split, preprocess=lambda x,y:train_data_prep(x,y,2,2,True), transform=None, label_dict=None):
        """
        Initialize the eeg dataset with the root directory for the images,
        the split (train/val/test), an optional data transformation,
        and an optional label dictionary.

        Args:
            root_dir (str): Root directory for the eeg images.
            split (str): Split to use ('train', 'val', or 'test').
            transform (callable, optional): Optional data transformation to apply to the images.
            label_dict (dict, optional): Optional dictionary mapping integer labels to class names.
        """
        assert split in ['train', 'val', 'test']
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.datastorch = []
        self.labels = []
        self.label_dict = ["Cue Onset left", "Cue Onset right", "Cue onset foot", "Cue onset tongue"]

        ################# Your Implementations #################################
        if self.split == 'train':
            # First generating the training and validation indices using random splitting
            X_train_valid = np.load(self.root_dir+"X_train_valid.npy")
            y_train_valid = np.load(self.root_dir+"y_train_valid.npy")

            np.random.seed(0)
            data_length = len(X_train_valid)

            ind_valid = np.random.choice(data_length, int(data_length*0.2), replace=False)
            ind_train = np.array(list(set(range(data_length)).difference(set(ind_valid))))

            # Creating the training and validation sets using the generated indices
            (x_train, x_valid) = X_train_valid[ind_train], X_train_valid[ind_valid]
            (y_train, y_valid) = y_train_valid[ind_train], y_train_valid[ind_valid]

            if preprocess is not None:
                x_train,y_train = preprocess(x_train,y_train)

            self.datas = torch.from_numpy(x_train)
            self.labels = [int(i-769) for i in torch.from_numpy(y_train)]

        if self.split == 'val':
            # First generating the training and validation indices using random splitting
            X_train_valid = np.load(self.root_dir+"X_train_valid.npy")
            y_train_valid = np.load(self.root_dir+"y_train_valid.npy")

            np.random.seed(0)
            data_length = len(X_train_valid)

            ind_valid = np.random.choice(data_length, int(data_length*0.2), replace=False)
            ind_train = np.array(list(set(range(data_length)).difference(set(ind_valid))))

            # Creating the training and validation sets using the generated indices
            (x_train, x_valid) = X_train_valid[ind_train], X_train_valid[ind_valid]
            (y_train, y_valid) = y_train_valid[ind_train], y_train_valid[ind_valid]

            if preprocess is not None:
                x_valid,y_valid = preprocess(x_valid,y_valid)

            self.datas = torch.from_numpy(x_valid)
            self.labels = [int(i-769) for i in torch.from_numpy(y_valid)]

        if self.split == 'test':
            x_test_og = np.load(self.root_dir+"X_test.npy")
            # x_test = test_data_prep(x_test_og)  # (2115, 1)  vals from 0-8 for participant
            y_test = np.load(self.root_dir+"y_test.npy")  # (443, 1)
            self.datas = torch.from_numpy(x_test_og)
            self.labels = [int(i-769) for i in torch.from_numpy(y_test)]

        ################# End of your Implementations ##########################

    def __len__(self):
        """
        Return the number of images in the dataset.

        Returns:
            int: Number of images in the dataset.
        """
        dataset_len = 0
        ################# Your Implementations #################################
        # Return the number of images in the dataset
        dataset_len = len(self.datas)
        ################# End of your Implementations ##########################
        return dataset_len

    def __getitem__(self, idx):
        """
        R10140    idx (int): Index of the image to retrieve.

        Returns:
            tuple: Tuple containing the image and its label.
        """
        ################# Your Implementations #################################
        # Load and preprocess image using self.root_dir,
        # self.filenames[idx], and self.transform (if specified)

        data = self.datas[idx]
        label = self.labels[idx]

        if self.transform:
            data = self.transform(data)
        ################# End of your Implementations ##########################
        return data, label

In [7]:
# 2d Convolution
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm


fc_block = lambda i, o : nn.Linear(i,o)


class ConvBlock2D(nn.Module):
    def __init__(self, input_size, output_size, kernel_size, dropout):
        super().__init__()
        try:
          padding = (kernel_size-1)//2
        except:
          padding = ((kernel_size[0]-1)//2, (kernel_size[1]-1)//2)
        self.conv_block = nn.Sequential(
            nn.Conv2d(in_channels=input_size, out_channels=output_size, kernel_size=kernel_size, padding=padding),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, padding=1),
            nn.BatchNorm2d(output_size),
            nn.Dropout(dropout)
        )
    def forward(self, x, **kwargs):
        # keey the residual connection here
        return self.conv_block.forward(x)


class ResBlock2D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size = 3, dropout=0.6,  downsample=False):
        '''
        Initialize a basic residual block.
        Depending on whether downsample is True, there are two types of basic blocks in Resnet 18,
        one to downsample the input and the other maintains the same size.
        You can use Pytorch's functions.

        Input and output shapes of each layer:
        1) conv1 (3*3 kernel, no bias): (batch_size, in_channels, H, W) -> (batch_size, out_channels, H, W) if downsample=False
           conv1 (3*3 kernel, no bias): (batch_size, in_channels, H, W) -> (batch_size, out_channels, H//2, W//2) if downsample=True, with stride of 2
        2) conv2 (3*3 kernel, no bias): (batch_size, out_channels, H, W) -> (batch_size, out_channels, H, W)
        3) conv3 (optional) if downsample=True (1*1 kernel, no bias): (batch_size, in_channels, H, W) -> (batch_size, out_channels, H//2, W//2) with stride of 2
        '''
        super().__init__()
        ################# Your Implementations #################################
        self.downsample = downsample
        if (in_channels != out_channels):
           self.downsample = True
        try:
          padding = (kernel_size-1)//2
        except:
          padding = ((kernel_size[0]-1)//2, (kernel_size[1]-1)//2)
        self.kernel_size = kernel_size
        self.dropout = dropout
        # self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, bias=False, padding=1) #(128 - 3 + 2*1)//2 + 1
        self.conv2 = nn.Sequential(
                        nn.Conv2d(out_channels, out_channels, kernel_size = kernel_size, stride = 1, padding = padding, bias=False),
                        nn.BatchNorm2d(out_channels))
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.relu2 = nn.ReLU()
        if self.downsample:
          self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=1, padding=1, bias=False) #(128 - 3 + 2*1)//2 + 1
          self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False) #(128 - 3 + 2*1)//2 + 1
        else:
          self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False) #(128 - 3 + 2*1)//2 + 1

        ################# End of your Implementations ##########################

    def forward(self, x):
        """
        Forward pass of the basic residual block.
        The input tensor 'x' should pass through the following layers:
        1) conv1: (batch_size, in_channels, H, W) -> (batch_size, out_channels, H, W)
           conv1: (batch_size, in_channels, H, W) -> (batch_size, out_channels, H//2, W//2) if downsample=True
        2) Apply batch normalization after conv1.
        3) Apply relu activation.
        4) conv2: (batch_size, out_channels, H, W) -> (batch_size, out_channels, H, W)
        5) Apply batch normalization after conv2.
        6) (Optional) if downsample=True, conv3: (batch_size, in_channels, H, W) -> (batch_size, out_channels, H//2, W//2) on the original input
        7) (Optional) if downsample=True, apply batch normalization after conv3.
        8) Add the residual value to the original input
        9) Apply relu activation in the end.
        """
        ################# Your Implementations #################################
        # TODO: Implement the forward pass of the basic residual block.
        og=x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.downsample:
          og = self.conv3(og)
          og = self.bn3(og)
        x += og
        x =self.relu2(x)
        x = nn.Dropout(self.dropout)(x)
        ################# End of your Implementations ##########################
        return x



In [8]:
# Data loading
data_root = "../project_data/project/"
if GOOGLE:
    data_root = "/content/drive/MyDrive/project/"
data_transform = lambda x: abs(np.fft.fft(x))
data_transform = lambda x : scale_high_freq(x, 0.06, 0.4)
data_transform =  lambda x: x.reshape(1,x.shape[0],x.shape[1]) # fft transformation

# Create eeg dataset object
eeg_train = EEG_Data(data_root,
                              split='train',
                              preprocess=lambda x,y:(x,y),
                              transform=data_transform)

eeg_val = EEG_Data(data_root,
                            split='val',
                            preprocess=lambda x,y:(x,y),
                            transform=data_transform)
eeg_test = EEG_Data(data_root,
                            split='test',
                            preprocess=lambda x,y:(x,0),
                            transform=None)
# Create the dataloaders
# Define the batch size and number of workers
batch_size = 64
num_workers=2
# Create DataLoader for training and validation sets
train_loader = DataLoader(eeg_train,
                          batch_size=batch_size,
                          num_workers=num_workers,
                        shuffle=True)
val_loader = DataLoader(eeg_val,
                        batch_size=batch_size,
                        num_workers=num_workers,
                        shuffle=False)
test_loader = DataLoader(eeg_test,
                        batch_size=batch_size,
                        num_workers=num_workers,
                        shuffle=False)

NameError: name 'GOOGLE' is not defined

In [8]:
resnetv2 =  nn.Sequential(
    ConvBlock2D(1,50,(22,70), 0.5),
    ConvBlock2D(50,100,(22,70), 0.5),
    ConvBlock2D(100,120,(22,70), 0.5),
    nn.Flatten(start_dim=1),
    fc_block(1800, 40),
    nn.BatchNorm1d(40),
    nn.ReLU(),
    fc_block(40,4)
)

In [12]:
import tqdm
from tqdm import tqdm

seed_everything(0)
# train_laoder.160transform =  lambda x: x.reshape(1,x.shape[0],x.shape[1]) # fft transformation
train_loader.transform = None # fft transformation


model = resnetv2

model.to(device)

# Let's use the built-in optimizer for a full version of SGD optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# For loss function, your implementation and the built-in loss function should
# be almost identical.
criterion = nn.CrossEntropyLoss()

# Train the model
train(model,
      train_loader,
      test_loader,
      optimizer,
      criterion,
      device,
      num_epochs=30)

avg_loss, accuracy = evaluate(model, test_loader, criterion, device)

print("avg_loss", avg_loss)
print("accuracy", accuracy)



Epoch 1/30:   0%|          | 0/27 [00:00<?, ?it/s]