In [1]:
from tqdm.notebook import tqdm
from fetcher import fetch_difumo
import numpy as np
from nilearn.maskers import NiftiMapsMasker
import dask.array as da
from dask import delayed
import nibabel as nib
from dask.distributed import Client, LocalCluster
from nilearn.interfaces.fmriprep import load_confounds

In [2]:
difumo_dim = 64
maps_img = fetch_difumo(dimension=difumo_dim).maps
maps_masker = NiftiMapsMasker(maps_img=maps_img, verbose=0, standardize=True)

In [None]:
all_sub_time_series = []

for sub in tqdm(range(1, 101), desc='Subjects'):
    bold_fname = f'/home/jovyan/shared/ds003097/derivatives/fmriprep/sub-{sub:04}/func/sub-{sub:04}_task-moviewatching_space-MNI152NLin2009cAsym_desc-preproc_bold.nii.gz'
    try: 
        nib.load(bold_fname)  # Check if we have access to the file
    except:
        print(f'Error for {sub}')
        continue
    
    confounds_minimal_no_gsr, sample_mask = load_confounds(
        bold_fname,
        strategy=["high_pass", "motion", "wm_csf", "global_signal"],
        motion="basic", wm_csf="basic", global_signal="basic")
    

    time_series = maps_masker.fit_transform(bold_fname, confounds=confounds_minimal_no_gsr,
                                            sample_mask=sample_mask)
    all_sub_time_series.append(time_series)




Subjects:   0%|          | 0/100 [00:00<?, ?it/s]

In [16]:
# subjects_fmri = []

# for sub in range(1, 101):
#     fname = f'/home/jovyan/shared/ds003097/derivatives/fmriprep/sub-{sub:04}/func/sub-{sub:04}_task-moviewatching_space-MNI152NLin2009cAsym_desc-preproc_bold.nii.gz'
#     try: 
#         subjects_fmri.append(nib.load(fname))
#     except:
#         print(f'Error for {sub}')


In [17]:
# subjects_fmri

In [18]:
# num_subjects_data = len(subjects_fmri)
# num_subjects_data

In [9]:
# num_frames = 100
# signals_list = []


# for sub_fmri in subjects_fmri:



# for subj_id in tqdm(range(num_subjects_data), desc='Time'):

#     all_subs_time = []
#     for sub_fmri in subjects_fmri:
#         time_slice = sub_fmri.slicer[:, :, :, time]
#         all_subs_time.append(time_slice)

#     signals = maps_masker.fit_transform(all_subs_time)
#     signals_list.append(signals)

# all_signals = np.array(signals_list)


In [19]:
# all_data = no_parallel()

In [20]:
# with open('all_data.npy', 'wb') as f:
#     np.save(f, all_data)

In [None]:
def parallel_work():
    cluster = LocalCluster(n_workers=2, )
    client = Client(cluster)
    
    num_frames = 290
    signals_list_delayed = []

    def process_time_slice(frame_num):
        maps_masker = NiftiMapsMasker(maps_img=maps_img, verbose=0, standardize=True)
        all_subs_frame = []
        for sub_fmri in subjects_fmri:
            time_slice = sub_fmri.slicer[:, :, :, frame_num]
            all_subs_frame.append(time_slice)
        signals = maps_masker.fit_transform(all_subs_frame)
        return signals

    process_time_slice_delayed = delayed(process_time_slice)

    for frame_n in range(num_frames):
        signals_list_delayed.append(
            da.from_delayed(
                process_time_slice_delayed(frame_n), 
                (num_subjects_data, difumo_dim), 
                dtype=float
            )
        )

    all_signals = da.stack(signals_list_delayed, 0)
    # all_signals.visualize(filename='all_signals.svg')
    # result = all_signals.compute()
    my_result = client.compute(all_signals)
    result = client.gather(my_result)
    return result
