In [4]:
import pandas as pd
import numpy as np
import nibabel as nib
import re
from scipy import stats
from statsmodels.stats.multitest import fdrcorrection

def load(file_path: str):
    """
    load available file
    available exts: .nii(.gz), .xls(x), .csv, .tsv, .json

    :param file_path: file want to load
    :type file_path: str
    :return: object
    """
    if file_path.endswith('.nii') or file_path.endswith('.nii.gz'):
        img = nib.Nifti1Image.load(file_path)
    else:
        if file_path.endswith('.1D'):
            img = pd.read_csv(file_path, header=None, sep=r'\s+')
        else:
            raise Exception('Input filetype is not compatible.')
    return img

def save_to_nii(data: np.ndarray,
                niiobj: nib.Nifti1Image,
                fpath: str,
                copy_header=False,
                space='scanner'):
    nii = nib.Nifti2Image(data, niiobj.affine)
    if copy_header:
        nii._header = niiobj.get_header().copy()
    else:
        if space == 'scanner':
            nii.header['qform_code'] = 1
            nii.header['sform_code'] = 0
    nii.to_filename(fpath)
    
class PathMan:
    def __init__(self, path):
        self._path = path
        self._check_exists()

    def _check_exists(self):
        if not os.path.exists(self._path):
            os.mkdir(self._path)

    def listdir(self, pattern=None):
        result = [p for p in os.listdir(self._path)]
        if pattern is not None:
            result = [p for p in result if re.match(pattern, p)]
        return {i: p for i, p in enumerate(result)}

    def chdir(self, dir_name):
        return PathMan(self(dir_name))

    def __call__(self, fname):
        return os.path.join(self._path, fname)

    def load(self, idx, pattern=None):
        return load(self(self.listdir(pattern=pattern)[idx]))
    
def polynomial_feature(data: np.ndarray,
                       order: int) -> np.ndarray:
    """ Generate polynomial features for input data

    Args:
        data: V x T data where V is voxels and T is time points
        order: order of polynomial

    Returns:
        model: polynomial features with given order
    """
    n_ft = order + 1
    n_dp = data.shape[-1]
    model = np.zeros([n_dp, n_ft])
    for o in range(n_ft):
        if o == 0:
            model[:,0] = np.ones(n_dp)
        else:
            x = np.arange(n_dp)
            model[:,o] = x ** o
            model[:,o] /= model[o,:].max()
    return model
    
def linear_regression(data: np.ndarray,
                      model: np.ndarray,
                      method: str = 'svd',
                      return_beta: bool = False):
    """ linear regression using linear decomposition algorithm

    Args:
        data: V x T data where V is number of voxels and T is number of time points
        model: T x F data where T is number of time points and F is number of features
        method: 'svd' or 'qr'
        return_beta: return beta coefficient if it is True

    Returns:
        fitted_curve : V x T x F
        beta_coefficient : V x F
    """
    if method == 'svd':
        bs = np.linalg.pinv(model).dot(data.T).T
    elif method == 'qr':
        q, r = np.linalg.qr(model)
        bs = np.linalg.inv(r).dot(q.T).dot(data.T).T
    else:
        raise InvalidApproach('Invalid input for "metrics"')

    fitted = np.zeros([bs.shape[0], model.shape[0], model.shape[1]])
    for i, b in enumerate(bs):
        fitted[i, ...] = model * b
    if return_beta:
        return fitted, bs
    else:
        return fitted
    
def glm(data, model):
    from scipy import stats
    import numpy as np
    predicted, coef = linear_regression(data, model, method='svd', return_beta=True)
    
    dof = model.shape[0] - model.shape[1]
    mse = np.square(predicted.sum(-1) - data).sum(-1) / float(dof)
    se = np.sqrt((mse * np.concatenate([np.linalg.inv(np.dot(model.T, model)).diagonal()[:, np.newaxis]], axis=-1)).T)

    t = coef.copy()
    t[se == 0] = 0
    t[np.nonzero(se)] /= se[np.nonzero(se)]
    p = 2 * (1 - stats.t.cdf(abs(t), df=dof))
    return predicted, coef, t, p

def r_to_t(r, size):
    """
    calculate T and correspond p-value from given correlation matrix
    """
    from scipy.stats import t
    tval = r * np.sqrt(size - 2) / np.sqrt(1 - np.square(r))
    pval = 1 - t.cdf(tval, size - 2)
    return tval, pval

In [9]:
# Threshold

pval = 0.05  # group level
corr_thr = 0.3  # correlation

# DBS

In [10]:
import os
pm_dbs = PathMan('DBS-700uA')
pm_oput = PathMan('201222_output')
mask_path = './Rat_BrainMask-4slices.nii.gz'
mask_nii = nib.load(mask_path)
mask_idx = np.nonzero(mask_nii.dataobj)

In [11]:
# Use GLM to analysis significant responsed regions (1st level statistics)

ptrn = r'scaled-(?P<subj>[0-9]{2})-700-1.nii'

for i, fname in pm_dbs.listdir(pattern=ptrn).items():
    subj = re.match(ptrn, fname).group('subj')
    nii = pm_dbs.load(i, pattern=ptrn)

    dataobj = np.asarray(nii.dataobj)
    dataobj_masked = dataobj[mask_idx]
    mparam = pm_dbs.load(0, pattern=f'{subj}-700-1-dfile.1D')
    mparam -= mparam.mean(0)
    mparam /= abs(mparam).max(0)

    o2fscv = pm_dbs.load(0, pattern=f'{subj}-700-1.1D')
    o2fscv -= o2fscv[:10].mean()
    o2fscv /= o2fscv.max(0)

    port = polynomial_feature(dataobj, order=3)
    port /= port.max(0)

    model = np.c_[port, mparam, o2fscv]
    predicted, beta, t, p = glm(dataobj_masked, model)
    dataobj_filtered = dataobj_masked - predicted[..., :-1].sum(-1)
    r = pd.DataFrame(dataobj_filtered.T).corrwith(o2fscv[0])
    dof = dataobj_filtered.shape[-1] - model.shape[-1] + 1
    t4r, p4r = r_to_t(r, size=dof)

    output_glm = np.zeros(nii.shape[:3])
    output_glm[mask_idx] = beta[..., -1]
    output_corr = output_glm.copy()
    output_corr[mask_idx] = r

    save_to_nii(output_glm, nii, pm_oput(f'{subj}_GLM.nii.gz'))
    save_to_nii(output_corr, nii, pm_oput(f'{subj}_Corr.nii.gz'))

In [17]:
# Use GLM to analysis significant responsed regions (2nd level statistics)

n_subj = len(pm_oput.listdir(pattern=r'[0-9]{2}_Corr.nii.gz'))
n_voxl = mask_idx[0].shape[0]

data_array = np.zeros([n_voxl, n_subj])

for i, fname in pm_oput.listdir(pattern=r'[0-9]{2}_Corr.nii.gz').items():
    nii = nib.load(pm_oput(fname))
    data_array[:, i] = np.asarray(nii.dataobj)[mask_idx]
    
predict, coef, t, p = glm(np.arctanh(data_array), np.ones([n_subj, 1]))

predicted = np.tanh(coef[:, 0])
q = fdrcorrection(p[:, 0], alpha=pval)
predicted[~q[0]] = 0
predicted[predicted < corr_thr] = 0
group_avr = np.zeros(mask_nii.shape)
group_avr[mask_idx] = predicted

save_to_nii(group_avr, mask_nii, pm_oput('DBS_groupavr.nii.gz'))
print(q)

(array([ True,  True,  True, ...,  True,  True, False]), array([0.02433444, 0.02247493, 0.02247493, ..., 0.03034985, 0.04482143,
       0.07026136]))


# Oxygen challenge

In [6]:
pm_oxy = PathMan('./Oxygen Challenge/')

In [7]:
# Use GLM to analysis significant responsed regions (1st level statistics)

ptrn = r'scaled_(?P<subj>[0-9]{2})-O2.nii'

for i, fname in pm_oxy.listdir(pattern=ptrn).items():
    subj = re.match(ptrn, fname).group('subj')
    nii = pm_oxy.load(i, pattern=ptrn)

    dataobj = np.asarray(nii.dataobj)
    dataobj_masked = dataobj[mask_idx]
    mparam = pm_oxy.load(0, pattern=f'{subj}-O2-dfile.r01.1D')
    mparam -= mparam.mean(0)
    mparam /= abs(mparam).max(0)

    o2fscv = pm_oxy.load(0, pattern=f'{subj}-O2.1D')
    o2fscv -= o2fscv[:10].mean()
    o2fscv /= o2fscv.max(0)

    port = polynomial_feature(dataobj, order=3)
    port /= port.max(0)

    model = np.c_[port, mparam, o2fscv]
    predicted, beta, t, p = glm(dataobj_masked, model)

    dataobj_filtered = dataobj_masked - predicted[..., :-1].sum(-1)
    r = pd.DataFrame(dataobj_filtered.T).corrwith(o2fscv[0])
    dof = dataobj_filtered.shape[-1] - model.shape[-1] + 1
    t4r, p4r = r_to_t(r, size=dof)

    output_glm = np.zeros(nii.shape[:3])
    output_glm[mask_idx] = beta[..., -1]
    output_corr = output_glm.copy()
    output_corr[mask_idx] = r

    save_to_nii(output_glm, nii, pm_oput(f'{subj}_O2_GLM.nii.gz'))
    save_to_nii(output_corr, nii, pm_oput(f'{subj}_O2_Corr.nii.gz'))

In [23]:
# Use GLM to analysis significant responsed regions (2nd level statistics)

n_subj = len(pm_oput.listdir(pattern=r'.*O2_Corr.nii.gz'))
n_voxl = mask_idx[0].shape[0]

data_array = np.zeros([n_voxl, n_subj])

for i, fname in pm_oput.listdir(pattern=r'.*O2_Corr.nii.gz').items():
    nii = nib.load(pm_oput(fname))
    data_array[:, i] = np.asarray(nii.dataobj)[mask_idx]
    
predict, coef, t, p = glm(np.arctanh(data_array), np.ones([n_subj, 1]))
predicted = np.tanh(coef[:, 0])
q = fdrcorrection(p[:, 0], alpha=pval)
predicted[~q[0]] = 0
predicted[predicted < corr_thr] = 0
group_avr = np.zeros(mask_nii.shape)
group_avr[mask_idx] = predicted

save_to_nii(group_avr, mask_nii, pm_oput('O2Chal_groupavr.nii.gz'))