In [1]:
## Env

import nibabel as nib
from nilearn import datasets, input_data, plotting, connectome, image
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr, zscore
import pandas as pd
import os
import json
import glob
import gc
from joblib import Parallel, delayed
from tqdm import tqdm

## Functions

def process_sliding_windows_dfc(img_original, sub, movie, start, step, end, overlap_factor = 4):
    """ Dynamic Functional connectivity with slide windows """
    list_df = []
    for t in tqdm(range(start, end, step), desc=f' Processing dFC sub {sub} {movie}'):
        for i in range(overlap_factor): 
            # update the time for windows overlap
            t = round(t+i*step/overlap_factor) 
            if t+step <= end:
                # slice img
                img = img_original.dataobj[:,:,:,t:t+step]
                img = nib.Nifti1Image(img, img_original.affine, img_original.header)
                # get fc of the slice
                correlation_matrix = extract_fc(img)
                # format dataframe
                df = pd.DataFrame(correlation_matrix)
                df['node'] = df.index
                df['sub'] = sub
                df['movie'] = movie
                df['start'] = t #start do intervalo
                df['step'] = step
                list_df.append(df)
                # Clear intermediate variables to free memory
                del img, correlation_matrix, df
                gc.collect()
    # save
    final_df = pd.concat(list_df, ignore_index=True)
    return final_df


def load_image(sub, movie):
    path = os.path.join(f'/home/tamires/projects/rpp-aevans-ab/tamires/data/fmri_datasets/ds002837/derivatives/sub-{sub}/func', 
                        f'sub-{sub}_task-{movie}_bold_blur_censor_ica.nii.gz')
    return nib.load(path, mmap=True)


def extract_fc(fmri_img):
    # Use a predefined atlas from Nilearn (e.g., Harvard-Oxford atlas)
    atlas = datasets.fetch_atlas_basc_multiscale_2015(resolution=122,version='asym')
    atlas_img = atlas.maps
    # Define the masker to extract time series from ROIs and get time series
    masker = input_data.NiftiLabelsMasker(labels_img=atlas_img, standardize=True)
    time_series = masker.fit_transform(fmri_img)
    # Compute the correlation matrix
    correlation_measure = connectome.ConnectivityMeasure(kind='correlation')
    correlation_matrix = correlation_measure.fit_transform([time_series])[0]
    # Clear intermediate variables to free memory
    del masker, time_series, correlation_measure
    gc.collect()
    return correlation_matrix


def save_data(df, sub, movie): 
    # file name
    base_filename = f'/home/tamires/projects/rpp-aevans-ab/tamires/data/fmri_derived/dfcs/sub-{sub}_task-{movie}'
    version = 1
    filename = f"{base_filename}_v{version}.parquet"
    # check if there is a previous version, duplicate if there is a previous version
    while os.path.exists(filename):
        version += 1
        filename = f"{base_filename}_v{version}.parquet"
    # save file
    df.to_parquet(filename)
    

def process_participant(sub, movie, start, step, end):
    """
    start: onde vai comecar a ser processado
    step: os intervalos/janelas de processamento
    end: onde para de processar
    """
    # Load: 153 ms
    print(f"Initializing processing participant {sub} | {movie} | Estimated processing time: {round(16*4*((end-start)/step)/60,0)} minutes")
    img_original = load_image(sub, movie)
    # Process: 10s per interval per interval
    dfc = process_sliding_windows_dfc(img_original, sub, movie, start, step, end)
    # Save: 600ms
    save_data(dfc, sub, movie)
    print(f"File saved participant {sub} | {movie}")
    # Clear all variables to free memory after saving
    del img_original, dfc
    gc.collect()
    print(f"Memory cleared for participant {sub} | {movie}")

### Testes

In [17]:
%%time
process_participant(sub=1, movie="500daysofsummer", start=0, step=300, end=5470)

Initializing processing participant 1 | 500daysofsummer | Estimated processing time: 19.0 minutes


 Processing dFC sub 1 500daysofsummer:  32%|██████▉               | 6/19 [04:35<09:57, 45.95s/it]


KeyboardInterrupt: 

In [16]:
df = pd.read_parquet('../data/fmri_derived/dfcs/sub-1_task-500daysofsummer_v1.parquet')
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,117,118,119,120,121,node,sub,movie,start,step
0,1.000000,0.060413,0.196270,0.289575,0.475760,0.183561,0.191599,0.456325,0.313982,0.313079,...,0.379743,0.315935,0.132061,0.050542,0.374164,0,1,500daysofsummer,0,300
1,0.060413,1.000000,0.017021,0.028853,0.169921,0.098313,-0.006544,0.117295,0.086816,0.279415,...,0.141616,0.139375,0.126241,0.135390,0.125738,1,1,500daysofsummer,0,300
2,0.196270,0.017021,1.000000,0.064746,0.311003,0.117132,0.155627,0.084538,0.196306,0.090565,...,0.315722,0.202416,0.061081,0.079564,0.287519,2,1,500daysofsummer,0,300
3,0.289575,0.028853,0.064746,1.000000,0.247037,-0.083019,0.428500,0.227953,-0.066432,0.012066,...,0.004838,0.163086,0.115399,0.260602,0.001617,3,1,500daysofsummer,0,300
4,0.475760,0.169921,0.311003,0.247037,1.000000,0.343433,0.134583,0.535226,0.148585,0.289322,...,0.586331,0.399925,0.262945,0.165106,0.482578,4,1,500daysofsummer,0,300
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3411,0.511176,0.123597,0.313876,-0.032185,0.459026,0.484130,0.263664,0.476179,0.291669,0.250157,...,1.000000,0.349181,0.218011,0.278398,0.686393,117,1,500daysofsummer,2100,300
3412,0.305754,0.075156,0.285600,0.053915,0.332975,-0.063196,0.252464,0.415485,0.265049,0.068074,...,0.349181,1.000000,0.530792,0.426976,0.325041,118,1,500daysofsummer,2100,300
3413,0.253799,0.029160,0.262035,-0.032169,0.242554,-0.000514,0.367970,0.348342,0.077064,0.062919,...,0.218011,0.530792,1.000000,0.348151,0.288482,119,1,500daysofsummer,2100,300
3414,0.267518,0.096084,0.298997,0.242263,0.216427,0.033696,0.338480,0.290356,0.391545,0.184190,...,0.278398,0.426976,0.348151,1.000000,0.242148,120,1,500daysofsummer,2100,300


In [8]:
# imagem de teste
#fmri_path = '/home/tamires/projects/rpp-aevans-ab/tamires/data/fmri_datasets/ds002837/sub-1/func/sub-1_task-500daysofsummer_run-01_bold.nii.gz'
#fmri_img = nib.load(fmri_path)


### Processamento em Escala

In [2]:
# Data
participants = pd.read_csv("/home/tamires/projects/rpp-aevans-ab/tamires/data/fmri_datasets/ds002837/participants.tsv", sep='\t')
participants['sub'] = range(1,87)
participants = participants.rename(columns={'task':'movie'})
participants = participants[participants['sub'] != 49] # ta corrompido
participants['end_movie'] = [load_image(sub=row['sub'], movie=row['movie']).shape[3] for index, row in participants.iterrows()]
participants


Unnamed: 0,participant_id,age,sex,movie,sub,end_movie
0,sub-1,23,M,500daysofsummer,1,5470
1,sub-2,25,F,500daysofsummer,2,5470
2,sub-3,23,M,500daysofsummer,3,5470
3,sub-4,23,M,500daysofsummer,4,5470
4,sub-5,22,F,500daysofsummer,5,5470
...,...,...,...,...,...,...
81,sub-82,50,M,12yearsaslave,82,7715
82,sub-83,18,F,12yearsaslave,83,7715
83,sub-84,22,F,12yearsaslave,84,7715
84,sub-85,23,F,12yearsaslave,85,7715


In [44]:
# Parameters
participants_test = participants.iloc[4:86]
start = 0
step = 300

# Set the number of jobs (parallel workers)
n_jobs = 5  # Adjust this number based on your system's capacity
os.environ["OMP_NUM_THREADS"] = "1"  # Ensure thread limiting if needed

# Process
results = Parallel(n_jobs=n_jobs)(
    delayed(process_participant)(sub, movie, start, step, end) 
    for sub, movie, end in participants_test[['sub', 'movie','end_movie']].values
)

Initializing processing participant 2 | 500daysofsummer
Amount of windows/intervals: 18.233333333333334
Estimated processing time: 5.0 minutes
Initializing processing participant 3 | 500daysofsummer
Amount of windows/intervals: 18.233333333333334
Estimated processing time: 5.0 minutes


  table = self.api.Table.from_pandas(df, **from_pandas_kwargs)


File saved participant 3 | 500daysofsummer
File saved participant 2 | 500daysofsummer


  table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
