In [1]:
import numpy as np
import pandas as pd
import os
import bdpy
from tqdm import tqdm


data_path = "/home/acg17270jl/projects/fmri-reconstruction-with-dmvae/data/deeprecon/"
output_path = "/home/acg17270jl/projects/fmri-reconstruction-with-dmvae/results/deeprecon/conversion/"

subj_list = [1, 2, 3, 4, 5]

In [2]:
def save_result(result_data, output_dir, output_filename):
    """
    Save the results to a CSV file.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    df = pd.DataFrame(result_data)
    df.to_csv(os.path.join(output_dir, output_filename), index=None)

### Noise Ceiling

In [3]:
def calculate_pattern_noise_ceiling(brain, label, rep=24):
    '''
    The noise ceiling is simply the average of the cross-correlations between 
    fMRI responses to an identical stimulus.
    '''

    sort_idx = np.argsort(label.flatten())
    brain = brain[sort_idx]
    label = label[sort_idx]
    unique_label = np.unique(label)
    
    pattern_nc = []
    for image_idx in unique_label:
        pattern = brain[(label == image_idx).flatten(), :] # For a given stimulus (image_idx), the [rep, pattern]: 24 × total number of voxels (~15000)
        
        corrs = np.corrcoef(pattern)
        corr = np.mean(corrs)
        corr = np.maximum(corr, 0) # correlation below zero is set to be zero noise ceiling.

        pattern_nc.append(corr)

    return pattern_nc

In [4]:
def calculate_profile_noise_ceiling(brain, label, rep=24):
    '''
    The noise ceiling is simply the average of the cross-correlations between 
    fMRI responses to an identical stimulus.
    '''

    sort_idx = np.argsort(label.flatten())
    brain = brain[sort_idx]
    
    profile_nc = []
    for voxel_idx in range(brain.shape[1]):
        profile = brain[:, voxel_idx].reshape(rep, -1, order='F') # For a given voxel (voxel_idx), the [rep, profile]: 24 × total number of stimuli (=50).
        
        corrs = np.corrcoef(profile)
        corr = np.mean(corrs)
        corr = np.maximum(corr, 0) # correlation below zero is set to be zero noise ceiling.

        profile_nc.append(corr)

    return profile_nc

In [5]:
pattern_nc_result = []
profile_nc_result = []

for subj in subj_list:
    data = bdpy.BData(os.path.join(data_path, f"sub-{int(subj):02d}_NaturalImageTest.h5"))
    brain = data.select("ROI_VC")
    label = data.select("image_index")

    pattern_nc = calculate_pattern_noise_ceiling(brain,label)
    for i, corr in enumerate(pattern_nc):
        pattern_nc_result.append({
            'Subject': subj, 
            'Correlation': corr, 
            'Image_idx': i+1}
        )

    profile_nc = calculate_profile_noise_ceiling(brain,label)
    for i, corr in enumerate(profile_nc):
        profile_nc_result.append({
            'Subject': subj, 
            'Correlation': corr, 
            'Voxel_idx': i}
        )

save_result(pattern_nc_result, output_path, "pattern_noise_ceiling.csv")
save_result(profile_nc_result, output_path, "profile_noise_ceiling.csv")

### Noise Ceiling Threshold

In [6]:
def calculate_pattern_noise_ceiling_thd(brain, label, rep=24, n_sumpling=10000):
    '''
    To calculate the 99th percentile threshold for pattern correlation of single trial samples.
    If a noise ceiling is below this threshold, the noise ceiling is considered caused by the random noise.
    In other words, the measurement is not realiable and the signals contain no information.
    '''

    sort_idx = np.argsort(label.flatten())
    brain = brain[sort_idx]
    label = label[sort_idx]
    unique_label = np.unique(label)
    
    pattern_nc_thd99 = []
    pattern_nc_thd95 = []
    for image_idx in tqdm(unique_label):
        pattern = brain[(label == image_idx).flatten(), :]
        
        corr_samplings = []
        
        for _ in range(n_sumpling):
            random_pattern = np.zeros_like(pattern)

            # shuffle the pattern order for each rep
            keys = np.random.random(size=pattern.shape)
            idx  = np.argsort(keys, axis=1) 
            random_pattern = np.take_along_axis(pattern, idx, axis=1)

            corrs = np.corrcoef(pattern, random_pattern)[:rep, rep:]
            corr = np.mean(corrs)

            corr_samplings.append(corr)
        
        corr_samplings = np.sort(corr_samplings)

        pattern_nc_thd99.append(np.percentile(corr_samplings, 99))
        pattern_nc_thd95.append(np.percentile(corr_samplings, 95))
        
    pattern_nc_thd99 = np.array(pattern_nc_thd99) 
    pattern_nc_thd95 = np.array(pattern_nc_thd95)
    
    return pattern_nc_thd99, pattern_nc_thd95

In [7]:
def calculate_profile_noise_ceiling_thd(brain, label, rep=24, n_sumpling=10000):
    '''
    To calculate the 99th percentile threshold for profile correlation of single trial samples.
    If a noise ceiling is below this threshold, the noise ceiling is considered caused by the random noise.
    In other words, the measurement is not realiable and the signals contain no information.
    '''
    
    sort_idx = np.argsort(label.flatten())
    brain = brain[sort_idx]
    
    profile_nc_thd99 = []
    profile_nc_thd95 = []
    for voxel_idx in tqdm(range(brain.shape[1])):
        profile = brain[:, voxel_idx].reshape(rep, -1, order='F')
        
        corr_samplings = []
        
        for _ in range(n_sumpling):
            # shuffle the pattern order for each rep
            keys = np.random.random(size=profile.shape)
            idx  = np.argsort(keys, axis=1) 
            random_profile = np.take_along_axis(profile, idx, axis=1)

            corrs = np.corrcoef(profile, random_profile)[:rep, rep:]
            corr = np.mean(corrs)

            corr_samplings.append(corr)
        
        corr_samplings = np.sort(corr_samplings)

        profile_nc_thd99.append(np.percentile(corr_samplings, 99))
        profile_nc_thd95.append(np.percentile(corr_samplings, 95))
        
    profile_nc_thd99 = np.array(profile_nc_thd99) 
    profile_nc_thd95 = np.array(profile_nc_thd95)
    
    return profile_nc_thd99, profile_nc_thd95

In [8]:
pattern_nc_thd_result = []

for subj in subj_list:
    data = bdpy.BData(os.path.join(data_path, f"sub-{int(subj):02d}_NaturalImageTest.h5"))
    brain = data.select("ROI_VC")
    label = data.select("image_index")

    pattern_nc_thd99, pattern_nc_thd95 = calculate_pattern_noise_ceiling_thd(brain,label)
    
    for i, corr in enumerate(pattern_nc_thd99):
        pattern_nc_thd_result.append({
            'Subject': subj, 
            'Threshold_99': corr, 
            'Threshold_95': pattern_nc_thd95[i], 
            'Image_idx': i+1}
        )

save_result(pattern_nc_thd_result, output_path, "pattern_noise_ceiling_threshold.csv")

100%|██████████| 50/50 [1:14:39<00:00, 89.60s/it] 
100%|██████████| 50/50 [1:06:44<00:00, 80.09s/it]
100%|██████████| 50/50 [56:47<00:00, 68.16s/it]
100%|██████████| 50/50 [1:00:03<00:00, 72.08s/it]
100%|██████████| 50/50 [58:52<00:00, 70.65s/it]  


In [None]:
profile_nc_thd_result = []

for subj in subj_list:
    data = bdpy.BData(os.path.join(data_path, f"sub-{int(subj):02d}_NaturalImageTest.h5"))
    brain = data.select("ROI_VC")
    label = data.select("image_index")

    profile_nc_thd99, profile_nc_thd95 = calculate_profile_noise_ceiling_thd(brain,label)
    for i, corr in enumerate(profile_nc_thd99):
        profile_nc_thd_result.append({
            'Subject': subj, 
            'Threshold_99': corr, 
            'Threshold_95': profile_nc_thd95[i], 
            'Vox_idx': i}
        )

save_result(profile_nc_thd_result, output_path, "profile_noise_ceiling_threshold.csv")

In [None]:
# pattern_nc_thd_result = []
# profile_nc_thd_result = []

# for subj in subj_list:
#     data = bdpy.BData(os.path.join(data_path, f"sub-{int(subj):02d}_NaturalImageTest.h5"))
#     brain = data.select("ROI_VC")
#     label = data.select("image_index")

#     pattern_nc_thd99, pattern_nc_thd95 = calculate_pattern_noise_ceiling_thd(brain,label)
#     for i, corr in enumerate(pattern_nc_thd99):
#         pattern_nc_thd_result.append({'Subject': subj, 'Threshold_99': corr, 'Threshold_95': pattern_nc_thd95[i], 'Image_idx': i+1})

#     profile_nc_thd99, profile_nc_thd95 = calculate_profile_noise_ceiling_thd(brain,label)
#     for i, corr in enumerate(profile_nc_thd99):
#         profile_nc_thd_result.append({'Subject': subj, 'Threshold_99': corr, 'Threshold_95': profile_nc_thd95[i], 'Vox_idx': i})

# save_result(pattern_nc_thd_result, output_path, "pattern_noise_ceiling_threshold.csv")
# save_result(profile_nc_thd_result, output_path, "profile_noise_ceiling_threshold.csv")