In [1]:
import nilearn.image, nilearn.decoding

import numpy as np
import pandas as pd

from sklearn.svm import LinearSVC
from sklearn.model_selection import GroupKFold



In [10]:
#TM_dir = '/clmnlab/TM'
TM_dir = 'Z:/TM'

behav_dir = TM_dir + '/behav_data/'
fan_roi_dir = TM_dir + '/fMRI_data/masks/Fan/Fan280/'
stats_dir_6 = TM_dir + '/fMRI_data/stats/Reg6_MVPA2_IM_VWM/'
stats_dir_7 = TM_dir + '/fMRI_data/stats/Reg7_MVPA3_IM_COY/'
preproc_dir = TM_dir + '/fMRI_data/preproc_data/'
result_dir = TM_dir
Ttest_ROI_mask = TM_dir + '/Clust_mask_binary.nii'

subj_list = ["TML04_PILOT","TML05_PILOT","TML06_PILOT","TML07_PILOT", 
            "TML08_PILOT","TML09_PILOT","TML10_PILOT","TML11_PILOT",
            "TML12_PILOT","TML13","TML14","TML15","TML16","TML18","TML19","TML20",
            "TML21","TML22","TML23","TML24","TML25","TML26","TML28","TML29"]
subj_list = ["TML04_PILOT"]
subj = subj_list[0]

freq_range = range(10,20+1,1)
freq_c = freq_range[5]

In [3]:
def calc_freq_class(freq_range, freq):
    #freq_range = range(10,20+1,1)
    freqs = [x for x in freq_range]
    #print(freqs)
    f_mid = int((freqs[0]+freqs[-1])*0.5)
    return int(freq - f_mid)

In [17]:
def get_dataframe(subj, run):
    df = pd.read_csv(behav_dir + subj + '/behav_data_Dis.dat', sep='\t', header=None)
    df.columns=['trial', 'Freq.1', 'ISI1', 'Freq.2', 'ISI2', 'decision', 'correctness', 'RT', 'ISI3']
    df['Freq.1.class'] = [np.int64(np.sign(f-freq_c)) for f in df['Freq.1']]
    df['Freq.2.class'] = [np.int64(np.sign(f-freq_c)) for f in df['Freq.2']]
    df['Freq.other.index'] = [1 if a != 0 else 2 for a in df['Freq.1.class']]
    df['Freq.other.class'] = [a+b for a, b in zip(df['Freq.1.class'], df['Freq.2.class'])]
    df['answer.index'] = [1 if a>b else 2 for a, b in zip(df['Freq.1'], df['Freq.2'])]
    df['decision.index'] = [1 if x == 'before' else (2 if x=='after' else 'NaN') for i, x in enumerate(df['decision'])]
    df['Freq.other_decision.class'] = ['NaN' if b=='NaN' else (1 if a==b else -1) for a,b in zip(df['Freq.other.index'],df['decision.index'])]

    df['Freq.1.rank'] = [calc_freq_class(freq_range,f) for f in df['Freq.1']]
    df['Freq.2.rank'] = [calc_freq_class(freq_range,f) for f in df['Freq.2']]
    df['Freq.other.rank'] = [a+b for a, b in zip(df['Freq.1.rank'], df['Freq.2.rank'])]
    df['F1<F2.class'] = [int(np.sign(b-a)) for a, b in zip(df['Freq.1'], df['Freq.2'])]

    temp = []
    trials = [40, 30, 30]
    fin = 0
    for x in trials:
        ini = fin
        fin = ini + x
        temp.append(df.loc[ini:fin-1])

    # Note, Freq.other_answer.class == Freq.other_updown.class
    #validation = df['Freq.other_answer.class'] == df['Freq.other.classifier']
    #assert validation.all() == True
    #assert df['Freq.other_decision.class'].shape[0] == sum(trials)
    assert len(temp[run-1]) == trials[run-1]

    return temp[run-1]

In [13]:
def load_freq_all(subj, run):
    df = get_dataframe(subj, run)
    df = pd.DataFrame.reset_index(df)

    temp = [[df.loc[row,'Freq.1'],df.loc[row,'Freq.2']] for row in range(len(df))]
    temp = np.concatenate(temp)
    #temp = pd.DataFrame(temp)

    return temp

In [34]:
def get_label_index(subj, run, num_beta, target_beta):
## labeling betas of others indices ##
    ntrials = [40,30,30]
    nbeta = num_beta   # number of betas in one trial
    tb = target_beta   # target_beta, (The index starts from 1)
    idx = [ nbeta*i + tb[0] for i in range(ntrials[run-1])]
        
    return np.array(idx)

In [37]:
def load_beta_image(subj, run):
## load nilearn image ##
    img = nilearn.image.load_img(stats_dir_7 + '%s/r%02d.LSSout.nii.gz' % (subj, run))
    idx = get_label_index(subj, run, 3, [2])
    img = nilearn.image.index_img(img, idx)

    return img

In [42]:
def load_target(subj, run):
## load behavior data and make up them ##
    df = get_dataframe(subj, run)
    classifier = [ int(2*(idx-1.5)) for idx in df['decision.index'] ]

    return classifier

In [50]:
### X: neural data / y: answers / group: run number
def get_X_y_group(subj, runs):      
    
    Xs = [load_beta_image(subj, run) for run in runs]
    ys = [load_target(subj, run) for run in runs]

    group = [
        i for i, y in enumerate(ys) for j in range(len(y))
    ]
    Xs = nilearn.image.concat_imgs(Xs)
    ys = np.concatenate(ys)
    
    assert Xs.shape[-1] == ys.shape[0]
    assert ys.shape[0] == len(group)
    return Xs, ys, np.array(group)

In [9]:
def run_searchlight(full_mask, X, y, group, estimator, group_k, radius, chance_level):
    cv = GroupKFold(group_k)

    searchlight = nilearn.decoding.SearchLight(
    full_mask,
    radius=radius,
    estimator=estimator,
    n_jobs=4,
    verbose=False,
    cv=cv,
    scoring='balanced_accuracy'
    )

    searchlight.fit(X, y, group)
    scores = searchlight.scores_ - chance_level

    return nilearn.image.new_img_like(full_mask, scores)

In [56]:
for subj in subj_list:
    runs = [1,2,3]
    fullmask_datum = preproc_dir + subj + '/preprocessed/full_mask.%s.nii.gz' % (subj)
    #n_subj = len(subj_list)

    # full mask 를 사용하시면 됩니다. 여기에서는 processing time 줄이려고 작은 마스크 가져옴.
    print("Loading %s's mask_img..." % (subj))
    mask_img = nilearn.image.load_img(fullmask_datum)
    print("Done!")

    estimator = LinearSVC(max_iter=10000)
    estimator_name = 'svc'
    radius = 8  # 적절한 크기를 사용하세요.
    
    #pd.show_versions()

    print("Executing searchlight_svc_yellow")
    X, y, group = get_X_y_group(subj, runs)
    searchlight_img = run_searchlight(mask_img, X, y, group, group_k=3, estimator=estimator, radius=radius, chance_level=1/2)
    searchlight_img.to_filename(result_dir + '%s_r%d_yellow_%s.nii.gz' % (subj, radius, estimator_name))

    print('%s finished' % (subj))

Loading TML04_PILOT's mask_img...
Done!
Executing searchlight_svc_movement


KeyboardInterrupt: 