In [None]:

import scipy.io as sio
import os
import numpy as np
import pickle
import h5py
import scipy.stats as stats
from sklearn.model_selection import check_cv
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
import yaml

with open('./util/config__drama_data.yaml', 'r') as f_yml:
    config = yaml.safe_load(f_yml)

from util import util_dataload as udl
from util import util_ridge as uridge

from himalaya.ridge import RidgeCV, GroupRidgeCV
from himalaya.ridge import ColumnTransformerNoStack
import itertools


## Listing available features and movie titles

In [None]:

print('\n***** features *****')
for featCode in range(3): # 0–3
    for layerCode in range(4): # 0–3 [0)6th, 1)12th, 2)18th, 3)24th layer]
        annot_type = config['annotInfo']['annotTypes'][featCode]
        model_type = config['annotInfo']['modelNames'][featCode]
        model_subtype = config['ridgeInfo']['modelSubNames'][model_type][layerCode]
        featName = annot_type + '_' + model_type + '_' + model_subtype
        print(featName)

print('\n***** movie titles *****')
t_movietitle_list = list( np.unique(config['movieInfo']['titleNames']) )
print('\n'.join(t_movietitle_list))

# (Demo) Model fitting

## Param: Setting parameters

subject_name : S01-S06<br>
feat_names :  Selecting ~3 features from the above list. 
movietitle_test : Selecting a single movie title from the above list. (e.g. 'BigBangTheory')

In [None]:

subject_name = 'S02'
feat_names =  ['objectiveAnnot50chara_GPT2medium_jp_layer12_mean', 'speechTranscription_GPT2medium_jp_layer12_mean', 'story_GPT2medium_jp_layer12_mean']
movietitle_test = 'bigbangtheory'


In [None]:
###
### other parameters

#  ---------------- himalaya  ----------------
# You can specify the following parameters related to "himalaya."
# https://gallantlab.org/himalaya/
onsets = []
alphas = np.logspace(-4, 15, 8)
intercept = False # Seeing False of True

#  ---------------- other OPTION related to ridge reg ----------------
stim_delays = [2, 4, 6]
nCV = 5


## Main: Predicting BOLD responses from the features.

In [None]:
###
### Data setting

### Setting items for data loading
subjectID = udl.get_subjectID_from_subjectName(config, subject_name)
datasize = udl.get_datasize(config, subjectID)

# Setting training subset and the corresponding onset
movIDs_train, movIDs_test = udl.get_train_test_movIDs(config, movietitle_test)

n_samples_ = udl.get_n_samples_from_movIDs(config, movIDs_train)
cv_shuffled_samples, cv_chunk_onset = uridge.get_subset_samples_and_onset(n_samples_, chunkLen=100, seedno=33)

# Loading BOLD resp and stim
# loading
t_movIDs_train_test = {'train': movIDs_train, 'test':movIDs_test}    
tmp_data_out = uridge.get_resp_stim(config, subjectID, feat_names, t_movIDs_train_test, stim_delays=stim_delays)
# get items
Resp_train= tmp_data_out['Resp']['train']
Resp_test= tmp_data_out['Resp']['test']
Stim_train= tmp_data_out['Stim']['train']
Stim_test= tmp_data_out['Stim']['test']
dimIDs = tmp_data_out['dimIDs']['ids']
dimID_onset = tmp_data_out['dimIDs']['onset']
del tmp_data_out
# n_samples_train/ test
n_samples_train = Resp_train.shape[0]
n_samples_test = Resp_test.shape[0]

# Nomarizing　stim using M and SD of stim_train
m_Stim_train, s_Stim_train, Stim_train = uridge.my_zscore(Stim_train)
_, _, Stim_test = uridge.my_zscore(Stim_test, mX=m_Stim_train, sX=s_Stim_train)


In [None]:
###
### Train

# (train data) shuffle resp and stim (100s chunk): and divided for 5-fold cv
if len(onsets) == 0:
    cv_onsets = [np.array_split(cv_chunk_onset, nCV)[cv_id][0] for cv_id in range(nCV)]
else: # 'nCV' is not used in this case.
    cv_onsets = onsets
cv = uridge.generate_leave_one_run_out(n_samples_train, cv_onsets, random_state=0, n_runs_out=1)
cv = check_cv(cv)
ncv_save = len(cv_onsets)

# Separate the first three columns and the last two
# columns, creating two groups of shape (n_samples, n_feature_i).
groupList = []
for tfeatID in range(len(feat_names)):
    groupList.append( ("group_{:d}".format(tfeatID), StandardScaler(), dimIDs[feat_names[0]]) )
ct = ColumnTransformerNoStack( groupList )

print('Model fitting ...')
solver_params={'alphas':alphas}
model = GroupRidgeCV(cv=cv, fit_intercept=intercept, groups="input", solver_params=solver_params)
pipe = make_pipeline(ct, model)
_ = pipe.fit(Stim_train[cv_shuffled_samples,:], Resp_train[cv_shuffled_samples,:])
print('done.')


In [None]:
###
### Test

# Computing pred accuracy: R2
r2, r2_split = uridge.get_test_results_r2(model, Stim_test, dimIDs, Resp_test, intercept=intercept)
    
# Computing pred accuracy: Pearson's corrcoef
rs, ps, ps_correction, sig_voxel_inds, ps_alpha = uridge.get_test_results_corr(model, Stim_test, Resp_test, ps_alpha = 0.05, intercept=intercept)
result_FDR_correction = {'ps_correction':ps_correction, 'sig_voxel_inds':sig_voxel_inds, 'ps_alpha':ps_alpha}


In [None]:
###
### Save

accs = dict()
accs['r2'] = {'r2':r2, 'r2_split':r2_split}
accs['corrcoef'] = {'type':'Pearson', 'rs':rs, 'ps':ps, 'result_FDR_correction':result_FDR_correction}
model_items = {'featNames':feat_names, 'dimIDs':dimIDs, 'stim_delays':stim_delays, 'nCV':ncv_save, 'alphas':alphas, 'fit_intercept':intercept, 'mean_Stim_train':m_Stim_train, 'std_Stim_train':s_Stim_train, 'movIDs_train':movIDs_train, 'movIDs_test':movIDs_test}
results_model = {'model_type':'ridge_regression', 'model':model, 'model_items':model_items, 'accs':accs}

dir_model = config['dir']['ridge'].replace('{derivative_dir}', config['dir']['derivative'])
os.makedirs(dir_model, exist_ok=True)
featNames_label = udl.get_featNames_label(config, feat_names)
path_model = '{:s}/test__banded_ridge__sub-{:s}__{:s}.pkl'.format(dir_model, subject_name, featNames_label)

pickle.dump(results_model, open(path_model, 'wb'))
