## Data loading

In [1]:
import h5py
import numpy as np
from pathlib import Path

In [2]:
import torch
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device('cpu')
print("Using {}.".format(device))

Using cuda.


In [4]:
colab = True

if colab == True:
    from google.colab import drive
    drive.mount('/content/drive')
    !unzip '/content/drive/My Drive/PatternRecognition/data.zip' -d '/content/data'
    data_dir = Path("./data/") / "Final Project data"
else:
  data_dir = Path("./data/")

assert data_dir.is_dir()
intra_dir = data_dir / "Intra"
cross_dir = data_dir / "Cross"
intra_train_glob = list((intra_dir / "train").glob("*.h5"))
intra_test_glob = list((intra_dir / "test").glob("*.h5"))
cross_train_glob = list((cross_dir / "train").glob("*.h5"))
cross_test1_glob = list((cross_dir / "test1").glob("*.h5"))
cross_test2_glob = list((cross_dir / "test2").glob("*.h5"))
cross_test3_glob = list((cross_dir / "test3").glob("*.h5"))
print(len(cross_train_glob), len(cross_test1_glob), len(cross_test2_glob))

64 16 16


In [5]:
def load_labels(path: Path) -> np.ndarray:
    *task, subject_identifier, chunk = path.stem.split("_")
    if "rest" in task:
        y = 0
    elif 'math' in task:
        y = 1
    elif 'working' in task:
        y = 2
    elif 'motor' in task:
        y = 3
    else:
        assert False, 'unknown task'
    return np.array([y, int(subject_identifier), int(chunk)])

def load_h5(path: Path) -> np.ndarray:
    with h5py.File(path) as f:
        keys = f.keys()
        assert len(keys) == 1, f"Only one key per file, right? {cross_train_glob[0]}"
        matrix = f.get(next(iter(keys)))[()]
    return matrix

In [6]:
load = "intra"

In [7]:
if load == 'intra':
  print("Loading the raw intra-subject dataset...")
  intra_train_X = np.stack(list(map(load_h5, intra_train_glob)))
  intra_train_labels = np.stack(list(map(load_labels, intra_train_glob)))[:, 0]

  intra_test_X = np.stack(list(map(load_h5, intra_test_glob)))
  intra_test_labels = np.stack(list(map(load_labels, intra_test_glob)))[:, 0]
  print("Done!")

elif load == 'cross':
  print("Loading the raw cross-subject dataset...")
  cross_train_X = np.stack(list(map(load_h5, cross_train_glob)))
  cross_train_labels = np.stack(list(map(load_labels, cross_train_glob)))[:, 0]

  #Load first test dataset
  cross_test1_X = np.stack(list(map(load_h5, cross_test1_glob)))
  cross_test1_labels = np.stack(list(map(load_labels, cross_test1_glob)))[:, 0]

  #Load second test dataset
  cross_test2_X = np.stack(list(map(load_h5, cross_test2_glob)))
  cross_test2_labels = np.stack(list(map(load_labels, cross_test2_glob)))[:, 0]

  #Load third test dataset
  cross_test3_X = np.stack(list(map(load_h5, cross_test3_glob)))
  cross_test3_labels = np.stack(list(map(load_labels, cross_test3_glob)))[:, 0]

  # Combine two datasets (1 and 2)
  cross_test_X = np.concatenate([cross_test1_X, cross_test2_X], axis=0)
  cross_test_labels = np.concatenate([cross_test1_labels, cross_test2_labels], axis=0)

  print(cross_test1_X.shape, cross_test1_labels.shape)
  print("Done!")

else:
  warnings.warn("No datasets preprocessed. Choose between intra and cross subject.", Warning)

Loading the raw intra-subject dataset...
Done!


## Data preprocessing

In [8]:
import numpy as np
import torch

def downsample(data, old_freq, new_freq):
    # Calculate the downsampling factor
    downsample_factor = int(np.round(old_freq / new_freq))
    # Ensure that timesteps are divisible by the downsampling factor
    data = data[:,:,:data.shape[2]//downsample_factor*downsample_factor]
    # Reshape
    reshaped_data = data.reshape(data.shape[0], data.shape[1], -1, downsample_factor)
    # Take the mean along the last axis
    downsampled_data = reshaped_data.mean(axis=-1)
    return downsampled_data

def z_score_normalize(data):
    # Convert to PyTorch tensor
    data_tensor = torch.tensor(data, dtype=torch.float32)
    # Calculate mean and std along the timesteps
    mean = torch.mean(data_tensor, dim=2, keepdim=True)
    std = torch.std(data_tensor, dim=2, keepdim=True)
    # Perform z-score norm
    normalized_data = (data_tensor - mean) / std
    return normalized_data

In [9]:
if load == 'intra':
  print("Preprocessing the intra-subject dataset...")
  intra_train_X_norm = z_score_normalize(downsample(intra_train_X, 2034, 30))
  intra_test_X_norm = z_score_normalize(downsample(intra_test_X, 2034, 30))
  print("Done!")

elif load == 'cross':
  print("Preprocessing the cross-subject dataset...")
  cross_train_X_norm = z_score_normalize(downsample(cross_train_X, 2034, 30))
  cross_test1_X_norm = z_score_normalize(downsample(cross_test1_X, 2034, 30))
  cross_test2_X_norm = z_score_normalize(downsample(cross_test2_X, 2034, 30))
  cross_test3_X_norm = z_score_normalize(downsample(cross_test3_X, 2034, 30))
  cross_test_X_norm = z_score_normalize(downsample(cross_test_X, 2034, 30)) # test1 and test2 combined
  print("Done!")

else:
  warnings.warn("No datasets preprocessed. Choose between intra and cross subject.", Warning)

Preprocessing the intra-subject dataset...
Done!


## VAR-CNN Architecture
implemented from: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6609925/

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

# Define the neural network module
class VectorAutoregressiveCNN(nn.Module):
    def __init__(self, k, l, l1, input_width = 1, input_channels=248, n_classes=4, input_height=523):
        super(VectorAutoregressiveCNN, self).__init__()
        #2D Conv
        self.conv = nn.Conv2d(input_channels, k, (l, 1))
        # Max Pooling
        self.pool = nn.MaxPool2d((2, 1), stride=(2, 1))
        # Calculate output shape after conv and pool
        conv_output_height = (input_height - l + 1) // 2
        conv_output_width = 1
        ninputs = k * conv_output_height * conv_output_width
        #Fully Connected Layer
        self.fc = nn.Linear(ninputs, n_classes)
        self.l1_penalty = l1

    def forward(self, x):
        x = self.conv(x)
        x = F.relu(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

    def l1_regularization(self):
        l1_norm = sum(p.abs().sum() for p in self.parameters())
        return self.l1_penalty * l1_norm

#define cost
loss_function = nn.CrossEntropyLoss()

## Prepare data for training

In [11]:
from torch.utils.data import DataLoader
from torch.utils.data.dataset import TensorDataset
import warnings
batch_size = 16

if load == 'intra':
  print("Loading the intra-subject tensor...")
  intra_dataset = TensorDataset(torch.tensor(intra_train_X_norm.unsqueeze(-1)), torch.tensor(intra_train_labels).long())
  intra_train_loader = DataLoader(intra_dataset, batch_size=batch_size, shuffle=True)

  intra_test_dataset = TensorDataset(torch.tensor(intra_test_X_norm.unsqueeze(-1)), torch.tensor(intra_test_labels).long())
  intra_test_loader = DataLoader(intra_test_dataset, batch_size=batch_size, shuffle=True)

  num_datasets = 1
  print("Done!")

elif load == 'cross':
  print("Loading the cross-subject tensor...")
  dataset = TensorDataset(torch.tensor(cross_train_X_norm.unsqueeze(-1)), torch.tensor(cross_train_labels).long())
  train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

  test_dataset_1 = TensorDataset(torch.tensor(cross_test1_X_norm.unsqueeze(-1)), torch.tensor(cross_test1_labels).long())
  test_dataset_2 = TensorDataset(torch.tensor(cross_test2_X_norm.unsqueeze(-1)), torch.tensor(cross_test2_labels).long())
  test_dataset_3 = TensorDataset(torch.tensor(cross_test3_X_norm.unsqueeze(-1)), torch.tensor(cross_test3_labels).long())
  test_dataset_12 = TensorDataset(torch.tensor(cross_test_X_norm.unsqueeze(-1)), torch.tensor(cross_test_labels).long())

  test_loader_1 = DataLoader(test_dataset_1, batch_size=batch_size, shuffle=True)
  test_loader_2 = DataLoader(test_dataset_2, batch_size=batch_size, shuffle=True)
  test_loader_3 = DataLoader(test_dataset_3, batch_size=batch_size, shuffle=True)
  test_loader_12 = DataLoader(test_dataset_12, batch_size=batch_size, shuffle=True)

  test_loaders = [test_loader_1, test_loader_2, test_loader_3]
  num_datasets = len(test_loaders)
  print("Done!")

else:
  warnings.warn("No datasets loaded. Choose between intra and cross subject.", Warning)


Loading the intra-subject tensor...
Done!


  intra_dataset = TensorDataset(torch.tensor(intra_train_X_norm.unsqueeze(-1)), torch.tensor(intra_train_labels).long())
  intra_test_dataset = TensorDataset(torch.tensor(intra_test_X_norm.unsqueeze(-1)), torch.tensor(intra_test_labels).long())


## GridSearch

In [29]:
from sklearn.model_selection import ParameterGrid
import numpy as np
from sklearn.model_selection import ParameterGrid

#The following hyperparameters were searched before arriving at the current ones:
param_grid = {
    'lr': [0.0001, 0.001, 0.01],
    'weight_decay': [1e-4, 1e-3, 1e-2],
    'num_epochs': [10, 20, 30],
    'k':[32, 64, 128],
    'l':[3, 5, 7],
    'l1': [1e-4, 1e-3, 1e-2]
}

param_grid = {
    'lr': [0.001],
    'weight_decay': [1e-2, 1e-1],
    'num_epochs': [30, 50],
    'k':[64],
    'l':[7, 14],
    'l1': [1e-2, 1e-1]
}

grid = ParameterGrid(param_grid)
best_params = None
best_test_loss = np.inf
best_test_acc = 0
train_accuracies = []


for params in grid:
  #batch_size = params['batch_size']
  num_epochs = params['num_epochs']
  model = VectorAutoregressiveCNN(params['k'], params['l'], params['l1'] )
  optimizer = Adam(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
  model.to(device)
  train_accuracies = []

  for epoch in range(num_epochs):

    train_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    # Training
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target) + model.l1_regularization()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # Curr Accuracy
        _, predicted = torch.max(output.data, 1)
        total_predictions += target.size(0)
        correct_predictions += (predicted == target).sum().item()

    # Average loss and accuracy over the epoch
    train_loss /= len(dataset)
    train_accuracy = correct_predictions / total_predictions
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    test_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    per_class_accuracies = []
    n_classes = 4
    class_correct = [0] * n_classes
    class_total = [0] * n_classes
    with torch.no_grad():
        for data, target in test_loader_12:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += loss_function(output, target).item()  # Sum up batch loss

            # Curr Accuracy
            _, predicted = torch.max(output.data, 1)
            total_predictions += target.size(0)
            correct_predictions += (predicted == target).sum().item()
            # Per-class accuracy
            for c in range(n_classes):
                class_total[c] += (target == c).sum().item()
                class_correct[c] += (predicted == c)[target == c].sum().item()


    test_loss /= len(test_loader_12.dataset)  # Get the average loss
    test_accuracy = correct_predictions / total_predictions

    # Per-class accuracy
    per_class_accuracy = [class_correct[c] / class_total[c] if class_total[c] != 0 else 0 for c in range(n_classes)]
    per_class_accuracies.append(per_class_accuracy)


    # Early stopping logic
    if test_accuracy > best_test_acc:
        best_test_acc = test_accuracy
        print(f"For params: {params}")
        print(f"Training loss: {train_loss}, Training accuracy: {train_accuracy}")
        print(f"Test loss: {test_loss}, Test accuracy: {test_accuracy}")
        print("Accuracy per class:", per_class_accuracies)
        print("-----------------------------")
        print()
        best_params = params
        #save the model
        checkpoint = {
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'epoch': epoch,
            'best_loss': test_loss,
            'best_accuracy': test_accuracy}

        torch.save(checkpoint, f'cnn_checkpoint.pt')
        #print(f"Checkpoint saved at epoch {epoch}")


For params: {'k': 32, 'l': 3, 'l1': 0.0001, 'lr': 0.0001, 'num_epochs': 10, 'weight_decay': 0.0001}
Training loss: 0.0911006685346365, Training accuracy: 0.265625
Test loss: 0.08457968756556511, Test accuracy: 0.28125
Accuracy per class: [[0.375, 0.125, 0.25, 0.375]]
-----------------------------

For params: {'k': 32, 'l': 3, 'l1': 0.0001, 'lr': 0.0001, 'num_epochs': 10, 'weight_decay': 0.0001}
Training loss: 0.07815599627792835, Training accuracy: 0.640625
Test loss: 0.08331136405467987, Test accuracy: 0.4375
Accuracy per class: [[0.875, 0.5, 0.25, 0.125]]
-----------------------------

For params: {'k': 32, 'l': 3, 'l1': 0.0001, 'lr': 0.0001, 'num_epochs': 10, 'weight_decay': 0.0001}
Training loss: 0.03177200257778168, Training accuracy: 0.984375
Test loss: 0.07667113468050957, Test accuracy: 0.46875
Accuracy per class: [[0.75, 0.375, 0.5, 0.25]]
-----------------------------

For params: {'k': 32, 'l': 3, 'l1': 0.0001, 'lr': 0.0001, 'num_epochs': 10, 'weight_decay': 0.0001}
Trainin

## Training Intra

In [18]:
import numpy as np

#Define hyperparameters
k, l, l1, lr, num_epochs, weight_decay = 64, 3, 0.001, 0.001, 30, 0.001

seed_value = 2001
torch.manual_seed(seed_value)

#Instantiate model
model = VectorAutoregressiveCNN(k=k, l=l, l1=l1)

#Instantiate optimizer
optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

# Set device to GPU
device = torch.device("cuda")
model.to(device)

# Define cost
loss_function = nn.CrossEntropyLoss()

# Initialize lists to store results
train_losses = []
train_accuracies = []
test_losses = [[] for _ in range(num_datasets)]
test_accuracies = [[] for _ in range(num_datasets)]
train_class_accuracies = []
per_class_accuracies = [[] for _ in range(num_datasets)]

num_epochs = 5
n_classes = 4

for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    class_correct = [0] * n_classes
    class_total = [0] * n_classes

    for batch_idx, (data, target) in enumerate(intra_train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target) + model.l1_regularization()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # Curr Accuracy
        _, predicted = torch.max(output.data, 1)
        total_predictions += target.size(0)
        correct_predictions += (predicted == target).sum().item()

        # Per-class accuracy
        for i in range(n_classes):
            class_total[i] += (target == i).sum().item()
            class_correct[i] += (predicted == i)[target == i].sum().item()

    # Average loss and accuracy over the epoch
    train_loss /= len(intra_train_loader)
    train_accuracy = correct_predictions / total_predictions
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Per-class accuracy
    per_class_accuracy = [class_correct[i] / class_total[i] if class_total[i] != 0 else 0 for i in range(n_classes)]
    train_class_accuracies.append(per_class_accuracy)

# Testing
test_loss = 0.0
correct_predictions = 0
total_predictions = 0
class_correct = [0] * n_classes
class_total = [0] * n_classes
model.eval()
with torch.no_grad():
    for data, target in intra_test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        test_loss += loss_function(output, target).item()  # Sum up batch loss

        # Current Accuracy
        _, predicted = torch.max(output.data, 1)
        total_predictions += target.size(0)
        correct_predictions += (predicted == target).sum().item()

        # Per-class accuracy
        for c in range(n_classes):
            class_total[c] += (target == c).sum().item()
            class_correct[c] += (predicted == c)[target == c].sum().item()

# Average loss and accuracy over the dataset
test_loss /= len(intra_test_loader)
test_accuracy = correct_predictions / total_predictions

# Per-class accuracy
per_class_accuracy = [class_correct[c] / class_total[c] if class_total[c] != 0 else 0 for c in range(n_classes)]

# Store or print the results
print(f'Training accuracy: {train_accuracy}')
print(f'Test accuracy: {test_accuracy}')
print(f'Per-class accuracy: {per_class_accuracy}')


Training accuracy: 1.0
Test accuracy: 0.5
Per-class accuracy: [1.0, 0.5, 0.0, 0.5]


## Training Cross

In [83]:
import numpy as np

#Define hyperparameters
k, l, l1, lr, num_epochs, weight_decay = 64, 3, 0.001, 0.001, 30, 0.001

seed_value = 2002
torch.manual_seed(seed_value)

#Instantiate model
model = VectorAutoregressiveCNN(k=k, l=l, l1=l1)
model.to(device)

#Instantiate optimizer
optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

# Initialize lists to store results
train_losses = []
train_accuracies = []
test_losses = []
test_accuracies = []
train_class_accuracies = []
per_class_accuracies = []

n_classes = 4

for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    class_correct = [0] * n_classes
    class_total = [0] * n_classes

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_function(output, target) + model.l1_regularization()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # Curr Accuracy
        _, predicted = torch.max(output.data, 1)
        total_predictions += target.size(0)
        correct_predictions += (predicted == target).sum().item()

        # Per-class accuracy
        for i in range(n_classes):
            class_total[i] += (target == i).sum().item()
            class_correct[i] += (predicted == i)[target == i].sum().item()

    # Average loss and accuracy over the epoch
    train_loss /= len(train_loader)
    train_accuracy = correct_predictions / total_predictions

    # Per-class accuracy
    per_class_accuracy = [class_correct[i] / class_total[i] if class_total[i] != 0 else 0 for i in range(n_classes)]

    if epoch == num_epochs - 1:
      train_accuracies.append(train_accuracy)
      train_losses.append(train_loss)
      train_class_accuracies += per_class_accuracy

# Testing on multiple datasets
for i, test_loader in enumerate(test_loaders):
    model.eval()
    test_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    class_correct = [0] * n_classes
    class_total = [0] * n_classes

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += loss_function(output, target).item()  # Sum up batch loss

            # Curr Accuracy
            _, predicted = torch.max(output.data, 1)
            total_predictions += target.size(0)
            correct_predictions += (predicted == target).sum().item()

            # Per-class accuracy
            for c in range(n_classes):
                class_total[c] += (target == c).sum().item()
                class_correct[c] += (predicted == c)[target == c].sum().item()

    # Average loss and accuracy over the epoch
    test_loss /= len(test_loader.dataset)
    test_accuracy = correct_predictions / total_predictions

    # Per-class accuracy
    per_class_accuracy = [class_correct[c] / class_total[c] if class_total[c] != 0 else 0 for c in range(n_classes)]

    test_accuracies.append(test_accuracy)
    test_losses.append(test_loss)
    per_class_accuracies.append(per_class_accuracy)

# Print results
print(f"Train Loss: {round(train_losses[0],3)}, Train Accuracy: {round(train_accuracies[0],3)}")

print(f"\n Overall Test Loss: {round(np.mean(test_losses),3)}, Overall Accuracy: {round(np.mean(test_accuracies),3)}")

for i in range(len(test_loaders)):
  print(f"\nTest Dataset {i+1}")
  print(f"Test Loss: {round(test_losses[i],3)}, Test Accuracy: {round(test_accuracies[i],3)}")


avg_class_test = np.mean(per_class_accuracies, axis=0)
print("\nPer-Class Accuracies:")
for i in range(n_classes):
    print(f"Class {i}: Train {round(train_class_accuracies[i],3):.4f}, Test {round(avg_class_test[i],3):.4f}")


Train Loss: 0.584, Train Accuracy: 1.0

 Overall Test Loss: 0.078, Overall Accuracy: 0.625

Test Dataset 1
Test Loss: 0.093, Test Accuracy: 0.5

Test Dataset 2
Test Loss: 0.06, Test Accuracy: 0.688

Test Dataset 3
Test Loss: 0.08, Test Accuracy: 0.688

Per-Class Accuracies:
Class 0: Train 1.0000, Test 0.9170
Class 1: Train 1.0000, Test 0.2500
Class 2: Train 1.0000, Test 0.7500
Class 3: Train 1.0000, Test 0.5830
