In [73]:
import os
import numpy as np
import h5py
from scipy import stats
import scipy.io
import mne

mne.set_log_level('error')

from random import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torchsummary import summary

import optuna


from utils.load import Load
from utils.eval import accuracy
from config.default import cfg


%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [50]:
subject_id = 0

In [51]:
device_name = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device(device_name)
print(device)

cuda


In [52]:
# Load the data  from the HDF5 file
target_dir = 'features'
tag = '0_25powers' # with_bad
file_path = os.path.join(target_dir, tag+'_'+cfg['subjects'][subject_id] + '.h5')


data = {}
with h5py.File(file_path, 'r') as h5file:
    for key in h5file.keys():
        data[key] = np.array(h5file[key])

# Print the loaded data dictionary
for key, value in data.items():
    print(key, value.shape)

index (50, 158, 30, 2)
little (50, 158, 30, 2)
middle (50, 158, 30, 2)
ring (50, 158, 30, 2)
thumb (50, 158, 30, 2)


In [151]:
class CustomDataset(Dataset):
    def __init__(self, data, flatten = False, normalize = False, train_percent=0.8, seed=42, device=None, is_train=True):
        self.device = device
        self.is_train = is_train
        self.flatten = flatten
        self.normalize = normalize

        self.train_X, self.train_y, self.test_X, self.test_y = self.preprocess_data(data, train_percent, seed)
        self.train_X = torch.tensor(self.train_X, dtype=torch.float32, device=self.device)
        self.train_y = torch.tensor(self.train_y, dtype=torch.long, device=self.device)
        self.test_X = torch.tensor(self.test_X, dtype=torch.float32, device=self.device)
        self.test_y = torch.tensor(self.test_y, dtype=torch.long, device=self.device)

        print(f'Train data shape: {self.train_X.shape}')
        print(f'Train labels shape: {self.train_y.shape}')
        print(f'Test data shape: {self.test_X.shape}')
        print(f'Test labels shape: {self.test_y.shape}')
        self.dim = self.train_X[0].shape.numel()

    def get_dim():
        return self.dim

    def preprocess_data(self, data, train_percent, seed):
        train_features = []
        train_labels = []
        test_features = []
        test_labels = []

        for i, finger in enumerate(data):
            features = data[finger]
            if self.flatten:
                features = features.reshape(features.shape[0], -1)
            if self.normalize:
                original_shape = features.shape
                features = features.reshape(features.shape[0], -1)
                features = StandardScaler().fit_transform(features)
                features = features.reshape(original_shape)


            # Generate labels
            labels = torch.tensor(np.ones((len(features))) * i)

            
            
            train_features.extend(features[:int(len(features) * train_percent)])
            train_labels.extend(labels[:int(len(features) * train_percent)])
            test_features.extend(features[int(len(features) * train_percent):])
            test_labels.extend(labels[int(len(labels) * train_percent):])

         
        train_features = np.stack(arrays=train_features, axis=0)
        train_labels = np.stack(arrays=train_labels, axis=0)
        test_features = np.stack(arrays=test_features, axis=0)
        test_labels = np.stack(arrays=test_labels, axis=0)

        return train_features, train_labels, test_features, test_labels



    
    def __len__(self):
        return len(self.train_y) if self.is_train else len(self.test_y)

    def __getitem__(self, idx):
        if self.is_train:
            return self.get_train_item(idx)
        else:
            return self.get_test_item(idx)

    def get_train_item(self, idx):
        features = self.train_X[idx]
        label = self.train_y[idx]

        return features, label

    def get_test_item(self, idx):
        features = self.test_X[idx]
        label = self.test_y[idx]

        return features, label




train_dataset = CustomDataset(data, flatten = True, normalize = False, device=device, is_train=True)
test_dataset = CustomDataset(data, flatten = True, normalize = False, device=device, is_train=False)

train_dataloader = DataLoader(train_dataset, batch_size=50, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=50, shuffle=False)

Train data shape: torch.Size([200, 9480])
Train labels shape: torch.Size([200])
Test data shape: torch.Size([50, 9480])
Test labels shape: torch.Size([50])
Train data shape: torch.Size([200, 9480])
Train labels shape: torch.Size([200])
Test data shape: torch.Size([50, 9480])
Test labels shape: torch.Size([50])


In [152]:
# Test data loader
for i, (feature, label) in enumerate(train_dataloader):
    print(feature.shape)
    print(label.shape)
    print('---------------')
    break

torch.Size([50, 9480])
torch.Size([50])
---------------


In [153]:
class SingleLayerMLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, activation):
        super(SingleLayerMLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

        self.batchnorm = nn.BatchNorm1d(hidden_size)
        self.dropout = nn.Dropout(0.3)
        self.activation = activation

    def forward(self, x):
        x = self.fc1(x)
        x = self.batchnorm(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [154]:
# Create model
model = SingleLayerMLP(train_dataset.dim, 5, 5,  nn.ReLU())

model.to(device)
summary(model, input_size=(5, 10, *next(iter(train_dataloader))[0][0].shape));

Layer (type:depth-idx)                   Param #
├─Linear: 1-1                            47,405
├─Linear: 1-2                            30
├─BatchNorm1d: 1-3                       10
├─Dropout: 1-4                           --
├─ReLU: 1-5                              --
Total params: 47,445
Trainable params: 47,445
Non-trainable params: 0


In [155]:
learning_rate = 1e-3
epochs = 200

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.1)

# Training loop
for epoch in range(epochs):
    epoch_loss = 0.0

    for batch_features, batch_labels in train_dataloader:
        optimizer.zero_grad()
        outputs = model(batch_features)

        loss = criterion(outputs, batch_labels.long())
          
        # Backward propagation
        loss.backward()
        # Update the weights
        optimizer.step()

        epoch_loss += loss.item()

   

    if epoch % 10 == 9:
        train_accuracy = accuracy(model, train_dataloader)
        test_accuracy = accuracy(model, test_dataloader)
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss}, Train accuracy: {train_accuracy:.2f}%, Test accuracy: {test_accuracy:.2f}%")

print("#"*50)
print(f'Final_loss: {epoch_loss}')
print(f'Final train accuracy: {accuracy(model, train_dataloader):.2f}%')
print(f'Final test accuracy: {accuracy(model, test_dataloader):.2f}%')

Epoch 10/200, Loss: 3.3863431811332703, Train accuracy: 64.50%, Test accuracy: 36.00%
Epoch 20/200, Loss: 1.2877494990825653, Train accuracy: 97.50%, Test accuracy: 24.00%
Epoch 30/200, Loss: 0.5222935155034065, Train accuracy: 99.50%, Test accuracy: 24.00%
Epoch 40/200, Loss: 0.5021945312619209, Train accuracy: 99.50%, Test accuracy: 20.00%
Epoch 50/200, Loss: 0.4554506689310074, Train accuracy: 99.50%, Test accuracy: 20.00%
Epoch 60/200, Loss: 0.44104553014039993, Train accuracy: 99.50%, Test accuracy: 22.00%
Epoch 70/200, Loss: 0.41934122145175934, Train accuracy: 99.50%, Test accuracy: 28.00%
Epoch 80/200, Loss: 0.413548544049263, Train accuracy: 99.50%, Test accuracy: 24.00%
Epoch 90/200, Loss: 0.40208467841148376, Train accuracy: 99.50%, Test accuracy: 26.00%
Epoch 100/200, Loss: 0.39754699915647507, Train accuracy: 99.50%, Test accuracy: 28.00%
Epoch 110/200, Loss: 0.38988155871629715, Train accuracy: 99.50%, Test accuracy: 26.00%
Epoch 120/200, Loss: 0.38674963265657425, Train 

### Hyperparam optimalization

In [158]:
def train(train_dataloader, test_dataloader, model, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        for batch_features, batch_labels in train_dataloader:
            optimizer.zero_grad()
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels.long())
            loss.backward()
            optimizer.step()

    return accuracy(model, test_dataloader)

def objective(trial, train_dataloader, test_dataloader):
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    num_epochs = trial.suggest_int("num_epochs", 100, 2000)
    hidden_size = trial.suggest_int("hidden_size", 16, 128)
    activation_name = trial.suggest_categorical("activation", ["relu", "elu", "leaky_relu"])
    optimizer = trial.suggest_categorical("optimizer", ["SGD", "Adam"])
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-1, log=True)

    if activation_name == "relu":
        activation = nn.ReLU()
    elif activation_name == "elu":
        activation = nn.ELU()
    elif activation_name == "leaky_relu":
        activation = nn.LeakyReLU()

    if optimizer == "SGD":
        optimizer = optim.SGD
    elif optimizer == "Adam":
        optimizer = optim.Adam

    
    model = SingleLayerMLP(train_dataset.dim, hidden_size, 5, activation)
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optimizer(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    return train(train_dataloader, test_dataloader, model, criterion, optimizer, num_epochs=num_epochs)


def train_MLP(n_trials = 100):


    study = optuna.create_study(direction="maximize")
    study.optimize(lambda trial: objective(trial,train_dataloader, test_dataloader), n_trials=n_trials)

    best_trial = study.best_trial

    print(f'Best trial params: {best_trial.params}')
    print(f'Best trial accuracy: {best_trial.value :.2f}%')



In [159]:
train_MLP(n_trials=1)

[32m[I 2023-04-27 20:55:03,981][0m A new study created in memory with name: no-name-a0aa4f6c-3abf-43ed-92ba-50d6cb10f0b5[0m
[32m[I 2023-04-27 20:55:05,308][0m Trial 0 finished with value: 32.0 and parameters: {'learning_rate': 0.09902304412740562, 'num_epochs': 195, 'hidden_size': 98, 'activation': 'elu', 'optimizer': 'SGD', 'weight_decay': 1.9466944305815796e-05}. Best is trial 0 with value: 32.0.[0m


Best trial params: {'learning_rate': 0.09902304412740562, 'num_epochs': 195, 'hidden_size': 98, 'activation': 'elu', 'optimizer': 'SGD', 'weight_decay': 1.9466944305815796e-05}
Best trial accuracy: 32.00%


In [10]:
'''
tag = 'gpt4freq_all'
Best trial params: {'learning_rate': 0.009669058999906542, 'num_epochs': 1671, 'hidden_size': 111, 'activation': 'relu', 'optimizer': 'SGD'}
Best trial accuracy: 48.00%

tag = 'reproduced_with_bad'
Best trial params: {'learning_rate': 0.0021345711699235683, 'num_epochs': 732, 'hidden_size': 37, 'activation': 'relu', 'optimizer': 'Adam'}
Best trial accuracy: 48.00%
'''


"\ntag = 'gpt4freq_all'\nBest trial params: {'learning_rate': 0.009669058999906542, 'num_epochs': 1671, 'hidden_size': 111, 'activation': 'relu', 'optimizer': 'SGD'}\nBest trial accuracy: 48.00%\n\ntag = 'reproduced_with_bad'\nBest trial params: {'learning_rate': 0.0021345711699235683, 'num_epochs': 732, 'hidden_size': 37, 'activation': 'relu', 'optimizer': 'Adam'}\nBest trial accuracy: 48.00%\n"