In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from minepy import MINE
from scipy.stats import skew, kurtosis

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegression, BayesianRidge, Ridge, ElasticNet, Lasso
from sklearn.ensemble import RandomForestRegressor, AdaBoostRegressor, GradientBoostingRegressor, \
    HistGradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, classification_report
from sklearn.model_selection import StratifiedKFold, LeaveOneGroupOut, GroupKFold
from sklearn.decomposition import PCA

In [None]:
# Function to evaluate model performance
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    return mae, mse, rmse, r2


def smape(A, F):
    with np.errstate(divide='ignore', invalid='ignore'):
        tmp = np.abs(F - A) / (np.abs(A) + np.abs(F))
    tmp[np.isnan(tmp)] = 0
    return np.sum(tmp) / len(tmp)

In [None]:
FS = 400
s, e = 15, 25
a, b = 0, 246
data = np.load("../data/processed/train_img.npy", allow_pickle=True).item()

fmri = data["fMRI"]
rsp = data["RSP"]
ppg = data["PPG"]
bio = np.concatenate((rsp, ppg), axis=-1)

# rsp = data["RSP"][:, 5*FS:]
# ppg = data["PPG"][:, 5*FS:]
# bio = np.stack((rsp[:, ::10], ppg[:, ::10]), axis=-1)
train = np.swapaxes(fmri[:, a:b, s:e], 1, 2)
# train = np.expand_dims(np.swapaxes(fmri[:, :, :], 1, 2),  axis=-1)

subject = data["subject"]
target = data["class"].astype(int) + 1
level = data["level"]

train = np.reshape(train, (train.shape[0], -1))
bio = np.reshape(bio, (bio.shape[0], -1))
train2 = np.concatenate((train, bio), axis=-1)

print(f"Data shape: {train.shape}")
print(f"Bio shape: {bio.shape}")
print(f"Subject shape: {subject.shape}")
print(f"Target shape: {target.shape}")
print(f"Level shape: {level.shape}")
print(np.unique(target))

In [None]:
data_test = np.load("../data/processed/test_img.npy", allow_pickle=True).item()
test_idx = np.where(~np.isnan(data_test["class"]))[0]

y_test = data_test["class"][test_idx].astype(int) + 1
y_test_level = data_test["level"][test_idx]
fmri_test = np.swapaxes(data_test["fMRI"][:, a:b, s:e], 1, 2)
bio_test = np.concatenate((data_test["RSP"][:], data_test["PPG"][:]), axis=-1)

fmri_test = np.reshape(fmri_test, (fmri_test.shape[0], -1))
bio_test = np.reshape(bio_test, (bio_test.shape[0], -1))
test2 = np.concatenate((fmri_test, bio_test), axis=-1)

print(f"fMRI shape: {fmri_test.shape}")
print(f"Bio shape: {bio_test.shape}")
print(f"Test target shape: {y_test.shape}")
print("Level shape:", y_test_level.shape)
print(np.unique(y_test))

In [None]:
val_scores = []
test_scores = []
preds = []
smape_scores = []
smape_scores_test = []
test_mae = []

X = train2
X_test = test2
y = level
g1 = subject

cv = LeaveOneGroupOut()
# cv = GroupKFold(n_splits=5)
# cv = StratifiedGroupKFold(n_splits=10, shuffle=True, random_state=42)
for i, (tidx, vidx) in enumerate(cv.split(X, y, g1)):
    # print("#" * 50)
    # print(f"### Fold {i + 1}")

    X_train, X_val = X[tidx], X[vidx]
    y_train, y_val = y[tidx], y[vidx]

    # print(f"### train size {len(tidx)}, valid size {len(vidx)}")
    # print("#" * 50)

    # Fit the model
    model = SVR(kernel="linear", C=1).fit(X_train, y_train)
    # model = BayesianRidge().fit(X_train, y_train)
    # model = Ridge().fit(X_train, y_train)
    mae, mse, rmse, r2 = evaluate_model(model, X_val, y_val)
    print(f"MAE: {mae:.3f}, MSE: {mse:.3f}, RMSE: {rmse:.3f}, R2: {r2:.3f}")
    val_scores.append([mae, mse, rmse, r2])
    smape_scores.append(smape(y_val, model.predict(X_val)))

    # Inference on test set
    Y_pred = model.predict(X_test)
    # print(smape(y_test[test_idx], Y_pred[test_idx]))
    preds.append(Y_pred)
    true_levels = data_test["level"][test_idx]
    # test_count.append(np.sum(np.round(Y_pred.flatten()[test_idx]) == true_levels))
    test_mae.append(mean_absolute_error(y_test, Y_pred.flatten()[test_idx]))
    test_scores.append(smape(true_levels, Y_pred.flatten()[test_idx]))

print("#" * 100)
print(np.mean(val_scores, axis=0), np.std(val_scores, axis=0))
print("#" * 100)
print("Val SMAPE:")
print(np.mean(smape_scores), np.std(smape_scores))
print("#" * 100)
print("Test stats:")
print(f"Scores:")
print(np.mean(test_scores), np.std(test_scores))
print(f"MAE:")
print(np.mean(test_mae), np.std(test_mae))