In [1]:
import numpy as np

In [2]:
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedKFold

In [3]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [20]:
from Extract_data import Extract_data_from_subject
from preprocessing import Select_time_window, bandpass_filter
from TOL_dataset_utils import Transform_for_classificator
from constants import *
from models.CNN import CNN
from loops import cnn_train_loop, cnn_test_loop

In [6]:
scaler = MinMaxScaler()
loss_fn = nn.CrossEntropyLoss()

In [7]:
def save_results_for_participant(module, model_name, results_save, save_dir, Condition, subject, accuracy_list):
     if results_save:
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        file_name = save_dir + "_" + module + "_" + model_name + "_AVG_CV_" + Condition  + "_Subject_" + str(subject)+ ".npy"

        np.save(file_name, accuracy_list)

## Klasifikacija

In [None]:
for subject in SUBJECT_NUMBERS:

    X, y = Extract_data_from_subject(ROOT, subject, DATA_TYPE)
    # vreme kada postoji nadrazaj
    X = Select_time_window(X, START_T, END_T, SAMPLE_FREQ)
    # izvlacenje samo covert trial-a za sva 4 pravca
    X, y = Transform_for_classificator(X, y, CONDITIONS, CLASSES)

    X = bandpass_filter(bands, SAMPLE_FREQ, X, DEVICE)

    outer_cv = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)


    model = CNN()
    model.to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=LR_STEP_SIZE)

    # store avg accuracy for each fold
    cv_metrics_log = [0 for i in range(N_SPLITS)]

    # CV
    for i, (train_indices, test_indices) in enumerate(outer_cv.split(X, y)):

        # Pretprocesiranje
        y_train, y_test = y[train_indices], y[test_indices]
        X_train, X_test = X[train_indices], X[test_indices]

        # Skaliranje
        train_shape = X_train.shape
        test_shape = X_test.shape

        flattened_X_train = X_train.reshape(train_shape[0], -1)
        flattened_X_test = X_test.reshape(test_shape[0], -1)

        scaled_train = scaler.fit_transform(flattened_X_train)
        X_train = scaled_train.reshape(train_shape)

        scaled_test = scaler.transform(flattened_X_test)
        X_test = scaled_test.reshape(test_shape)

        # Tenzori
        X_train = torch.tensor(X_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.int64)
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_test = torch.tensor(y_test, dtype=torch.int64)

        # Create datasets for cross validation
        train_dataset = TensorDataset(X_train, y_train)
        test_dataset = TensorDataset(X_test, y_test)
        train_dataloader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = False)
        test_dataloader = DataLoader(test_dataset, batch_size = BATCH_SIZE, shuffle = False)

        for _ in range(EPOCHS):
          cnn_train_loop(train_dataloader, model, loss_fn, optimizer, DEVICE)
          cv_metrics_log[i] += cnn_test_loop(test_dataloader, model, loss_fn, DEVICE)
        
        # avg accuracy for current fold
        cv_metrics_log[i] /= EPOCHS



    # end of CV loop
    best_accuracy = max(cv_metrics_log)
    avg_model_accuracy = sum(cv_metrics_log)/N_SPLITS
    print(f'Subject: {subject} NN: Avg accuracy: {avg_model_accuracy} Best accuracy: {best_accuracy}')
    save_results_for_participant("TORCH", "CNN", SAVE, SAVE_DIR, CONDITIONS[0], subject, avg_model_accuracy)