In [1]:
import os

import h5py
import numpy as np
from himalaya.backend import set_backend
from himalaya.ridge import ColumnTransformerNoStack, BandedRidgeCV
from matplotlib import pyplot as plt
from scipy.stats import zscore
from sklearn import config_context
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.model_selection import ValidationCurveDisplay
from sklearn.pipeline import make_pipeline
from voxelwise_tutorials.delayer import Delayer
from voxelwise_tutorials.io import load_hdf5_array

In [None]:
backend = set_backend("torch_cuda", on_error='throw')
plt.style.use('nord-light-talk')
os.environ["SCIPY_ARRAY_API"] = "1"
data_dir = "../../data"
subject = 1
modality = "reading"
trim = 5
number_of_delays = 4

In [None]:
alphas = np.logspace(-5, 5, 10)
n_iter = 25
cv = 5

# Load features

In [None]:
features_train = h5py.File(os.path.join(data_dir, 'features', 'features_trn_NEW.hdf'), 'r')
features_val = h5py.File(os.path.join(data_dir, 'features', 'features_val_NEW.hdf'), 'r')

In [None]:
semantic_train = np.vstack([zscore(features_train[story]['english1000']) for story in features_train.keys()])
semantic_val = np.vstack([zscore(features_val[story]['english1000']) for story in features_val.keys()])

semantic_train = semantic_train.astype(np.float32)
semantic_val = semantic_val.astype(np.float32)

# Load brain data

In [None]:
Y_train_filename = os.path.join(data_dir, 'responses', f'subject{subject:02}_{modality}_fmri_data_trn.hdf')
Y_train = load_hdf5_array(Y_train_filename)

Y_test_filename = os.path.join(data_dir, 'responses', f'subject{subject:02}_{modality}_fmri_data_val.hdf')
Y_test = load_hdf5_array(Y_test_filename)

Y_train = np.vstack([zscore(Y_train[story][:-trim]) for story in Y_train.keys()])
Ys_test = [np.vstack([zscore(Y_test[story][i][:-trim]) for story in Y_test.keys()]) for i in range(2)]

Y_train, Ys_test = np.nan_to_num(Y_train), np.nan_to_num(Ys_test)
Y_train = Y_train.astype(np.float32)
Ys_test = [Y_test.astype(np.float32) for Y_test in Ys_test]

# Plot validation curves

In [None]:
class CustomEstimator(BaseEstimator, RegressorMixin):
    def __init__(self, alphas, n_iter, cv, n_targets_batch, n_alphas_batch, n_targets_batch_refit, score_func, number_of_delays=4):
        self.alphas = alphas
        self.n_iter = n_iter
        self.cv = cv
        self.n_targets_batch = n_targets_batch
        self.n_alphas_batch = n_alphas_batch
        self.n_targets_batch_refit = n_targets_batch_refit
        self.score_func = score_func
        self.number_of_delays = number_of_delays

    def fit(self, X, y):
        delayer = Delayer(delays=range(1, self.number_of_delays + 1))

        start_and_end = [0, X.shape[1]]
        slices = [
            slice(start, end)
            for start, end in zip(start_and_end[:-1], start_and_end[1:])
        ]
        ct = ColumnTransformerNoStack(transformers=[(f'feature_{i}', delayer, s) for i, s in enumerate(slices)])
        print(ct)

        solver_params = dict(
            alphas=self.alphas,
            n_iter=self.n_iter,
            n_targets_batch=self.n_targets_batch,
            n_alphas_batch=self.n_alphas_batch,
            n_targets_batch_refit=self.n_targets_batch_refit,
            score_func=self.score_func
        )
        banded_ridge_cv = BandedRidgeCV(cv=self.cv, groups="input", solver_params=solver_params)

        self.pipeline = make_pipeline(
            ct,
            banded_ridge_cv
        )
        self.pipeline.fit(X, y)
        return self

    def predict(self, X):
        return self.pipeline.predict(X)

In [None]:
with config_context(array_api_dispatch=True):
    ValidationCurveDisplay.from_estimator(
        CustomEstimator, semantic_train, Y_train, param_name="n_iter", param_range=[1, 2, 4, 8, 16],
        score_name=r"$R^2$", cv=cv
    )