In [None]:
# default_exp encoding

In [None]:
#hide
%load_ext autoreload
%autoreload 2

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
#export
import numpy as np
from sklearn.metrics import r2_score
from sklearn.model_selection import KFold

def product_moment_corr(x,y):
    '''Product-moment correlation for two ndarrays x, y'''
    from sklearn.preprocessing import StandardScaler
    x = StandardScaler().fit_transform(x)
    y = StandardScaler().fit_transform(y)
    n = x.shape[0]
    r = (1/(n-1))*(x*y).sum(axis=0)
    return r

# Training and validating voxel-wise encoding models
> Functions for training independent Ridge regressions for a large number of voxels and validating their performance

In [None]:
#export
def get_ridge_plus_scores(X, y, alphas=None, n_splits=8, scorer=None, voxel_selection=True, **kwargs):
    '''Returns ridge regressions trained in a cross-validation on n_splits of the data and scores on the left-out folds

    Parameters

        X : ndarray of shape (samples, features)
        y : ndarray of shape (samples, targets)
        alphas : None or list of floats, optional
                 Regularization parameters to be used for Ridge regression
        n_splits : int, optional
        scorer : None or any sci-kit learn compatible scoring function, optional
                 default uses product moment correlation
        voxel_selection : bool, optional, default True
                          Whether to only use voxels with variance larger than zero.
                          This will set scores for these voxels to zero.
        kwargs : additional arguments transferred to ridge_gridsearch_per_target

    Returns
        tuple of n_splits Ridge estimators trained on training folds
        and scores for all concatenated out-of-fold predictions'''
    if scorer is None:
        scorer = product_moment_corr
    kfold = KFold(n_splits=n_splits)
    if alphas is None:
        alphas = [1000]
    ridges = []
    score_list = []
    if voxel_selection:
        voxel_var = np.var(y, axis=0)
        y = y[:, voxel_var > 0.]
    for train, test in kfold.split(X, y):
        ridges.append(ridge_gridsearch_per_target(X[train], y[train], alphas, **kwargs))
        if voxel_selection:
            scores = np.zeros_like(voxel_var)
            scores[voxel_var > 0.] =  scorer(y[test], ridges[-1].predict(X[test]))
        else:
            scores = scorer(y[test], ridges[-1].predict(X[test]))
        score_list.append(scores[:, None])
    return ridges, np.concatenate(score_list, axis=-1)

`get_ridge_plus_scores` is a convenience function that trains `n_splits` Ridge regressions in a cross-validation scheme and evaluates their performance on the respective test set.

In [None]:
#export
def ridge_gridsearch_per_target(X, y, alphas, n_splits=5, **kwargs):
    '''Runs Ridge gridsearch across alphas for each target in y

    Parameters

        X : ndarray of shape (samples, features)
        y : ndarray of shape (samples, targets)
        alphas : None or list of floats, optional
                 Regularization parameters to be used for Ridge regression
        n_splits : int, optional
        kwargs : keyword parameters to be transferred to Ridge regression

    Returns
        Ridge regression trained on X, y with optimal alpha per target
        determined by KFold cross-validation
    '''
    from sklearn.linear_model import Ridge
    from sklearn.metrics import mean_squared_error
    cv_results = {'alphas': []}
    cv = KFold(n_splits=n_splits)
    for alpha in alphas:
        scores = []
        for train, test in cv.split(X, y):
            ridge = Ridge(alpha=alpha, **kwargs)
            scores.append(mean_squared_error(y[test], ridge.fit(X[train], y[train]).predict(X[test]),
                              multioutput='raw_values'))
        scores = np.vstack(scores).mean(axis=0)
        cv_results['alphas'].append(scores)
    cv_results['alphas'] = np.vstack(cv_results['alphas'])
    best_alphas = np.array(alphas)[np.argmin(cv_results['alphas'], axis=0)]
    return Ridge(alpha=best_alphas, **kwargs).fit(X, y)

# Example

First, we create some simulated `stimulus` and `fmri` data.

In [None]:
stimulus = np.random.randn(1000, 5)
fmri = np.random.randn(1000, 10)

We can now use `get_ridge_plus_scores` to estimate multiple [Ridge](https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.Ridge.html) regressions, one for each voxel (that maps the stimulus representation to this voxel) and one for each split (trained on a different training set and evaluated on the held-out set).
Since sklearn's `Ridge` estimator allows multi-output, we get one `Ridge` object per split.

In [None]:
ridges, scores = get_ridge_plus_scores(stimulus, fmri, n_splits=3)
print(len(ridges))
ridges

3


[Ridge(alpha=array([1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000])),
 Ridge(alpha=array([1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000])),
 Ridge(alpha=array([1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000]))]

Each `Ridge` estimator maps from the feature space to each voxel.
In our example, that means it has 10 (the number of voxels-9 independently trained regression models with 5 coeficients each (the number of features).

In [None]:
assert ridges[0].coef_.shape == (10, 5)
print(ridges[0].coef_)

[[ 0.03091656 -0.00552985  0.01743601  0.01279568  0.01053411]
 [ 0.00238455  0.0045204   0.00256562 -0.00748261 -0.00547277]
 [-0.00422868 -0.02666707  0.01187212 -0.02378444  0.03065698]
 [ 0.03035251 -0.01177954  0.01668209 -0.0109779   0.01566495]
 [ 0.0076802  -0.01518692 -0.01891325 -0.00329026 -0.00599254]
 [-0.01751964  0.00928253  0.00283076  0.01285903 -0.02664727]
 [ 0.03044979  0.02130243  0.01450193 -0.00512366  0.0111223 ]
 [-0.00812324 -0.00502767 -0.00471382  0.00727634 -0.02175445]
 [ 0.03337194  0.01006145 -0.01272059 -0.01134752 -0.01544005]
 [ 0.00368844  0.01035971 -0.03002989 -0.01830897 -0.00994489]]


We also get a set of scores (by default the [product moment correlation](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient), but you can supply your own via the `scorer` argument) that specifies how well we predict left-out data (with the usual caveats of using a correlation coefficient for evaluating it). In our case it is of shape (10, 3) because we predict 10 voxels and use a 3-fold cross-validation, i.e. we split 3 times.

In [None]:
assert scores.shape == (10, 3)
scores

array([[-0.01731239, -0.03907288,  0.04493863],
       [ 0.00467211, -0.06636354, -0.0427261 ],
       [-0.02978713,  0.06524192,  0.06223558],
       [-0.03216798, -0.00690511, -0.02197701],
       [ 0.07491743,  0.06247747, -0.00317411],
       [-0.0297277 , -0.0455451 , -0.01514241],
       [ 0.09316656,  0.0493194 ,  0.0818891 ],
       [-0.01266644, -0.06847899, -0.06770772],
       [ 0.04326812,  0.08139216,  0.04508078],
       [ 0.06647038,  0.0544588 ,  0.02366174]])

Of course we can also call specify which hyperparameters we want to use.
For example the values of the regularization parameter $\alpha$ we want to perform a gridsearch over or whether we want to normalize features.

In [None]:
alphas = [1, 10, 100]
ridges, scores = get_ridge_plus_scores(stimulus, fmri, n_splits=3,
                                       alphas=alphas,
                                       normalize=True)
assert ridges[0].normalize