In [2]:
# %pip install pyriemann_qiskit

In [3]:
import numpy as np

from src.utils import *
from src.covariance_means import generalized_eigenvalue_covariance_mean

from sklearn.pipeline import make_pipeline
from sklearn import svm
from sklearn.model_selection import StratifiedKFold, cross_val_score, GridSearchCV, train_test_split, KFold, StratifiedShuffleSplit, StratifiedGroupKFold, GroupKFold
from sklearn.metrics import balanced_accuracy_score
from sklearn.base import clone

from scipy.stats import mode

from pyriemann.spatialfilters import CSP, SPoC
from pyriemann.tangentspace import TangentSpace
from pyriemann.classification import MDM

from pyriemann_qiskit.classification import QuanticSVM, QuanticNCH
from pyriemann_qiskit.utils.hyper_params_factory import gen_x_feature_map, gen_z_feature_map

%load_ext autoreload
%autoreload 2

[QClass]  Initializing Quantum Classifier


  warn("mne not available. get_mne_sample will fail.")


# Training process, using pipelines

In [4]:
trainval_control_subjects, trainval_parkinson_subjects, test_control_subjects, test_parkinson_subjects, subject_covariances = load_and_split_subjects_by_group(pickle_file_path='data/densetnet201-4-CovsDict.pkl')

In [5]:
trainval_control_covs = calculate_mean_covariances_per_subject(subject_covariances = subject_covariances,
                                                               ids = trainval_control_subjects)
trainval_parkinnson_covs = calculate_mean_covariances_per_subject(subject_covariances = subject_covariances,
                                                                    ids = trainval_parkinson_subjects)
test_control_covs = calculate_mean_covariances_per_subject(subject_covariances = subject_covariances,
                                                            ids = test_control_subjects)
test_parkinson_covs = calculate_mean_covariances_per_subject(subject_covariances = subject_covariances,
                                                             ids = test_parkinson_subjects)

In [6]:
# concatenate dictionaries
trainval_covs = {**trainval_control_covs, **trainval_parkinnson_covs}
test_covs = {**test_control_covs, **test_parkinson_covs}

In [7]:
check_subject_video_count(trainval_control_covs)
check_subject_video_count(trainval_parkinnson_covs)
check_subject_video_count(test_control_covs)
check_subject_video_count(test_parkinson_covs)

All subjects have exactly 8 videos.
All subjects have exactly 8 videos.
All subjects have exactly 8 videos.
Subject PG_0028 has 16 videos
Subject PG_0029 has 16 videos


2

In [8]:
# Creating the X, y arrays only for training_val subset
X_trainval, y_trainval, ids_trainval, groups_trainval = create_X_y_groups_arrays(trainval_covs)
print(f'Training/validation data...')
print(f"X shape: {X_trainval.shape}, y shape: {y_trainval.shape}, ids shape: {ids_trainval.shape}, groups shape: {groups_trainval.shape}")

X_test, y_test, ids_test, groups_test = create_X_y_groups_arrays(test_covs)
print(f'Test data...')
print(f"X shape: {X_test.shape}, y shape: {y_test.shape}, ids shape: {ids_test.shape}, groups shape: {groups_test.shape}")

Training/validation data...
X shape: (176, 20, 20), y shape: (176,), ids shape: (176,), groups shape: (176,)
Test data...
X shape: (136, 20, 20), y shape: (136,), ids shape: (136,), groups shape: (136,)


In [9]:
# Define spatial filter
spatial_filter = CSP(nfilter=4, log=False)

# Define metric
METRIC = "logeuclid"
# define pipelines:


pipeline_classical = make_pipeline(
    spatial_filter,
    TangentSpace(metric=METRIC),
    svm.SVC()
)


pipe_ts_qsvm2 = make_pipeline(
    # Whitening(metric="logeuclid", dim_red={'n_components': 2}),
    # PCA_SPD(n_components=2),
    spatial_filter,
    TangentSpace(metric=METRIC),
    QuanticSVM(gen_feature_map=gen_x_feature_map(reps=2)))


[QClass]  Initializing Quantum Classifier


In [10]:
cv = StratifiedGroupKFold(n_splits=10)

# Scores training only on training/validation dataset
scores_svm = cross_val_score(estimator = pipeline_classical, 
                             X = X_trainval,
                             y = y_trainval,
                             scoring="balanced_accuracy", 
                             cv=cv, groups=groups_trainval)

print(f'scores svm: {scores_svm.mean()} {scores_svm}')

# Scores with all data
X_all, y_all, ids_all, groups_all = np.concatenate([X_trainval, X_test]), np.concatenate([y_trainval, y_test]), np.concatenate([ids_trainval, ids_test]), np.concatenate([groups_trainval, groups_test])
scores_svm_all = cross_val_score(estimator = pipeline_classical,
                                    X = X_all,
                                    y = y_all,
                                    scoring="balanced_accuracy",
                                    cv=cv, groups=groups_all)
print(f'scores svm all: {scores_svm_all.mean()} {scores_svm_all}')

scores svm: 0.990625 [0.96875 1.      1.      1.      1.      1.      0.9375  1.      1.
 1.     ]
scores svm all: 0.8354166666666668 [0.5        0.5        0.875      1.         0.85416667 1.
 0.875      1.         0.75       1.        ]


# Manually testing 

In [83]:
# cv = StratifiedGroupKFold(n_splits=10)
# models = []
# scores = []

# for fold, (train_idx, test_idx) in enumerate(cv.split(X, y, groups)):    
#     # Split the data
#     # print(f'Test ids: {ids[test_idx]}')
#     X_train, X_test = X[train_idx], X[test_idx]
#     y_train, y_test = y[train_idx], y[test_idx]
    
#     # Clone the model
#     model = clone(pipeline_classical)
    
#     # Fit the model on the training data
#     model.fit(X_train, y_train)
    
#     # Save the trained model
#     models.append(model)
    
#     # Evaluate the model on the validation data
#     y_pred = model.predict(X_test)
#     score = balanced_accuracy_score(y_test, y_pred)
#     scores.append(score)
    
#     # print(f"Fold {fold + 1} balanced accuracy score: {score}")

# print(f'Avg scores: {np.mean(scores)} | Scores: {scores}')

# y_preds = []
# for idx, model in enumerate(models):
#     predictions = model.predict(X_test)
#     y_preds.append(predictions)
#     # print(f"Predictions from model {idx + 1}: {predictions}")
# y_preds = np.array(y_preds)
# final_preds = mode(y_preds, axis=0)[0].flatten() # Get the majority vote prediction

# score_test = balanced_accuracy_score(y_test, final_preds)
# print(f"Final test score: {score_test}")
