<a href="https://colab.research.google.com/github/nickelodeon20/Test-Model-Performances/blob/main/ATCNet/ATCNet_BCI_IV2a.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Authors: Lukas Gemein
#          Hubert Banville
#          Simon Brandt
#          Daniel Wilson
#
# License: BSD (3-clause)
!pip install braindecode # Install braindecode package
from braindecode.datasets import MOABBDataset

In [None]:
subject_id = 3

!pip install moabb
!pip install --upgrade numpy==1.26.0
dataset = MOABBDataset(dataset_name="BNCI2014_001", subject_ids=[subject_id])

In [None]:
!pip install --upgrade numpy

from numpy import multiply

from braindecode.preprocessing import (Preprocessor,
                                       exponential_moving_standardize,
                                       preprocess)

low_cut_hz = 4.  # low cut frequency for filtering
high_cut_hz = 38.  # high cut frequency for filtering
# Parameters for exponential moving standardization
factor_new = 1e-3
init_block_size = 1000
# Factor to convert from V to uV
factor = 1e6

# Define a named function for volt-to-microvolt conversion
def convert_to_microvolts(data, factor=factor):
    return multiply(data, factor)

preprocessors = [
    Preprocessor('pick_types', eeg=True, meg=False, stim=False),  # Keep EEG sensors
    Preprocessor(convert_to_microvolts,  apply_on_array=True),  # Convert from V to uV using the named function
    Preprocessor('filter', l_freq=low_cut_hz, h_freq=high_cut_hz),  # Bandpass filter
    Preprocessor(exponential_moving_standardize,  # Exponential moving standardization
                 factor_new=factor_new, init_block_size=init_block_size)
]

# Transform the data
preprocess(dataset, preprocessors, n_jobs=-1)

In [None]:
from braindecode.preprocessing import create_windows_from_events

trial_start_offset_seconds = -0.5
# Extract sampling frequency, check that they are same in all datasets
sfreq = dataset.datasets[0].raw.info['sfreq']
assert all([ds.raw.info['sfreq'] == sfreq for ds in dataset.datasets])
# Calculate the trial start offset in samples.
trial_start_offset_samples = int(trial_start_offset_seconds * sfreq)

# Create windows using braindecode function for this. It needs parameters to define how
# trials should be used.
windows_dataset = create_windows_from_events(
    dataset,
    trial_start_offset_samples=trial_start_offset_samples,
    trial_stop_offset_samples=0,
    preload=True,
)

In [None]:
from braindecode.datasets import BaseConcatDataset

# Debugging function to check shapes of datasets
def get_baseconcatdataset_shape(dataset):
    """
    Gets the shape of a BaseConcatDataset.

    Args:
        dataset: The BaseConcatDataset object.

    Returns:
        A tuple representing the shape of the dataset: (num_examples, num_channels, num_timepoints).
    """

    num_examples = len(dataset)

    # If dataset is empty, return (num_examples, 0, 0)
    if not dataset.datasets:
        return (num_examples, 0, 0)

    # Assume all datasets have the same shape, access shape of the first example
    # of the first dataset
    num_channels = dataset.datasets[0][0][0].shape[0]
    num_timepoints = dataset.datasets[0][0][0].shape[1]

    return (num_examples, num_channels, num_timepoints)



# Split the dataset into training and validation set.

splitted = windows_dataset.split('session')
train_set = splitted['0train']  # Session train
valid_set = splitted['1test']  # Session evaluation
print("Windows_dataset before split: " + str(get_baseconcatdataset_shape(windows_dataset)))
print("------------------------------")
print("Valid_set before split: " + str(get_baseconcatdataset_shape(valid_set)))
print("------------------------------")
print("Train_set (not involved in split): " + str(get_baseconcatdataset_shape(train_set)) + "\n")

# Get the list of datasets within valid_set
valid_set_datasets = valid_set.datasets

# Create new split indices within the valid_set
n_valid_trials = len(valid_set_datasets)
valid_subset1_ids = list(range(0, int(0.8 * n_valid_trials)))  # First half of valid_set
valid_subset2_ids = list(range(int(0.8 * n_valid_trials), n_valid_trials))  # Second half of valid_set

print("Valid Set ids: " + str(valid_subset1_ids))
print("------------------------------")
print("Test Set ids: " + str(valid_subset2_ids) + "\n")

# Create subsets of valid_set based on indices
valid_set_subset = BaseConcatDataset([valid_set_datasets[i] for i in valid_subset1_ids])
test_set = BaseConcatDataset([valid_set_datasets[i] for i in valid_subset2_ids])



# Update the valid_set with the new subsets
valid_set.datasets = valid_set_subset

print("Valid_set after split: " + str(get_baseconcatdataset_shape(valid_set_subset)))
print("------------------------------")
print("Test_set after split (N/A before): " + str(get_baseconcatdataset_shape(test_set)))
print("------------------------------")
print("Train_set (not involved in split): " + str(get_baseconcatdataset_shape(train_set)))


In [None]:
import torch

from braindecode.models import ShallowFBCSPNet
from braindecode.util import set_random_seeds

cuda = torch.cuda.is_available()  # check if GPU is available, if True chooses to use it
device = 'cuda' if cuda else 'cpu'
if cuda:
    torch.backends.cudnn.benchmark = True
# Set random seed to be able to roughly reproduce results
# Note that with cudnn benchmark set to True, GPU indeterminism
# may still make results substantially different between runs.
# To obtain more consistent results at the cost of increased computation time,
# you can set `cudnn_benchmark=False` in `set_random_seeds`
# or remove `torch.backends.cudnn.benchmark = True`
seed = 20200220
set_random_seeds(seed=seed, cuda=cuda)

n_classes = 4
classes = list(range(n_classes))
# Extract number of chans and time steps from dataset
n_chans = train_set[0][0].shape[0]
input_window_samples = train_set[0][0].shape[1]

model = ShallowFBCSPNet(
    n_chans,
    n_classes,
    input_window_samples=input_window_samples,
    final_conv_length='auto',
)

# Display torchinfo table describing the model
print(model)

# Send model to GPU
if cuda:
    model = model.cuda()

In [None]:
from skorch.callbacks import LRScheduler, EpochScoring
from skorch.helper import predefined_split

from braindecode import EEGClassifier

# We found these values to be good for the shallow network:
lr = 0.0625 * 0.01
weight_decay = 0

# For deep4 they should be:
# lr = 1 * 0.01
# weight_decay = 0.5 * 0.001

batch_size = 64
n_epochs = 50


clf = EEGClassifier(
    model,
    criterion=torch.nn.NLLLoss,
    optimizer=torch.optim.AdamW,
    train_split=predefined_split(valid_set_subset),  # using valid_set for validation
    optimizer__lr=lr,
    optimizer__weight_decay=weight_decay,
    batch_size=batch_size,
    callbacks=[
        ("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=n_epochs - 1)),
        ('train_acc', EpochScoring(scoring='accuracy', on_train=True, name='train_acc')),
               ('train_f1', EpochScoring(scoring='f1_macro', on_train=True, name='train_f1')),
                ('precision', EpochScoring(scoring='precision_macro', on_train=True, name='train_precision')),
                ('recall', EpochScoring(scoring='recall_macro', on_train=True, name='train_recall'))  # Add the F1 score callback

    ],
    device=device,
    classes=classes,
)
# Model training for the specified number of epochs. `y` is None as it is
# already supplied in the dataset.
_ = clf.fit(train_set, y=None, epochs=n_epochs)

In [None]:
import matplotlib.pyplot as plt
from numpy import array, arange
from matplotlib import cm

# Extract the values for the metrics
train_loss = clf.history_[:, 'train_loss']
valid_loss = clf.history_[:, 'valid_loss']
train_acc = clf.history_[:, 'train_acc']
valid_acc = clf.history_[:, 'valid_acc']
train_f1 = clf.history_[:, 'train_f1']
precision = clf.history_[:, 'train_precision']
recall = clf.history_[:, 'train_recall']

# Calculate misclassification rates
train_misclass = (100 - 100 * array(train_acc))/100
valid_misclass = (100 - 100 * array(valid_acc))/100

# Get a colormap
cmap = cm.get_cmap('viridis', 10)

# Create the plot
epochs = arange(len(train_loss))

plt.figure(figsize=(12, 8))

plt.plot(epochs, train_loss, label='Training Loss', color=cmap(0))
plt.plot(epochs, valid_loss, label='Validation Loss', color=cmap(2))
plt.plot(epochs, train_acc, label='Training Accuracy', color=cmap(4))
plt.plot(epochs, valid_acc, label='Validation Accuracy', color=cmap(6))
plt.plot(epochs, train_f1, label='Training F1 Score', color=cmap(8))
plt.plot(epochs, precision, label='Precision', color=cmap(1))
plt.plot(epochs, recall, label='Recall', color=cmap(3))
plt.plot(epochs, train_misclass, label='Training Misclassification Rate', color=cmap(5))
plt.plot(epochs, valid_misclass, label='Validation Misclassification Rate', color=cmap(7))

plt.text(epochs[-1], train_loss[-1], 'Train Loss', ha='left', va='center')
plt.text(epochs[-1], valid_loss[-1], 'Val. Loss', ha='left', va='center')
plt.text(epochs[-1], train_acc[-1], 'Train Acc.', ha='left', va='bottom')
plt.text(epochs[-1], valid_acc[-1], 'Val. Acc.', ha='left', va='center')
plt.text(epochs[-1], train_f1[-1], 'Train F1', ha='left', va='top')
plt.text(epochs[-1], precision[-1], 'Train Precision', ha='right', va='top')
plt.text(epochs[-1], recall[-1], 'Train Recall', ha='right', va='bottom')
plt.text(epochs[-1], train_misclass[-1], 'Val. Misclass. Rate', ha='left', va='center')
plt.text(epochs[-1], valid_misclass[-1], 'Train Misclass. Rate', ha='left', va='center')


plt.ylim(0, 2)

plt.xlabel('Epochs')
plt.ylabel('Value (%)')
plt.title('Training and Validation Metrics')
plt.legend()
plt.grid(True, )
plt.show()