In [1]:
def transform_data_nt(feature, df):

    word_target = df[df['feat'] == feature]
    word_target = word_target.drop(['feat'],axis = 1)
    word_target = word_target.groupby(['subject','channel']).mean('correlation')
    word_target = word_target.reset_index()
    
    subj_ar = []
    for s in word_target['subject'].unique():
        sdf = word_target[word_target['subject'] == s]
    
        pv = sdf.pivot(index = 'subject',columns=[ 'channel'], values='correlation')
        ar = pv.values
        subj_ar.append(ar)
    
    res = np.array(subj_ar)
    return res,word_target

  

def create_adjacency(feature_df, time_resolved = True):
    from mne.stats import combine_adjacency

    
    f = feature_df.groupby("channel").mean("correlation")
    f = f.reset_index()

    
    info = mne.create_info(ch_names=f['channel'].tolist(), sfreq=1000, ch_types='eeg')
     
    # Load the standard 10-20 montage
    montage = mne.channels.make_standard_montage('biosemi64')
    
    # Extract all positions from the montage
    pos = montage.get_positions()['ch_pos']
    
    # Filter positions to include only those channels in your DataFrame
    pos = {ch: pos[ch] for ch in f['channel'].tolist()}
    
    
    info.set_montage(montage)
    adj_channels = mne.channels.find_ch_adjacency(info = info, ch_type= "eeg")

    if time_resolved == True:
        n_times = 4
    else:
        n_times = 1
    adj = combine_adjacency(n_times, adj_channels[0])
    
    return adj,adj_channels


def cluster_test_fits_nt(var, adj):
    import numpy as np 
    import scipy.stats as stats
    from mne.stats import spatio_temporal_cluster_1samp_test, summarize_clusters_stc, ttest_1samp_no_p
    from functools import partial
    
   
    
    var = np.array(var)
  
    print("Clustering.")
    res_d = mne.stats.permutation_cluster_1samp_test(
        var,
        n_jobs=None,
        seed=1,
        verbose=True, 
        tail = 1, 
        adjacency = adj, 
        step_down_p = 0.1
    )
    
    
    cor_mean =np.mean(np.array(var), axis = 0)
    incor_mean = np.mean(np.array(var), axis = 0)
    diff_trf = cor_mean-incor_mean
    
    t_obs, clusters, pvals, h0 = res_d
    print(res_d)
    cluster_alpha = 0.05
    print(len(res_d[1]))
    if len(res_d[1]) > 0:
        nan_trf = np.ones_like(cor_mean) * np.nan
        nan_trf = np.ones_like(cor_mean) * np.nan
        nan_trf_list = np.ones_like(cor_mean) * np.nan
        nan_diff = nan_trf.copy()
        
        for k_cluster, c in enumerate(clusters):
            if pvals[k_cluster] < cluster_alpha:
                lags = c
                if abs(cor_mean[lags].mean()) > abs(incor_mean[lags].mean()):
                    nan_trf[lags] = cor_mean[lags]
                else:
                    nan_trf_list[lags] = incor_mean[lags]
                nan_diff[lags] = diff_trf[lags]
                
            signi_times_t = (~np.isnan(nan_diff)).astype(float)
            signi_times_t[signi_times_t == 0.] = np.nan
            signi_chans_bool_t = (~np.all(np.isnan(nan_diff), axis=0))
    else:
        signi_times_t = diff_trf
        signi_chans_bool_t = np.nan

    
    return signi_times_t, signi_chans_bool_t,res_d





import scipy.stats as stats
def create_topo_fits_sig_single(feature_df,  signi_times, feature = None,  time_resolved = True, palette= "Reds"):
    import pandas as pd
    from matplotlib.colors import TwoSlopeNorm
    


    fig,ax = plt.subplots(ncols = 1, figsize = (2,2))
    
 
    minimum = np.min(feature_df['correlation'])
    maximum = np.max(feature_df['correlation'])

    mask = signi_times[0]
    if not np.any(signi_times) == False: 
        mask = np.isfinite(mask)
    else:  
        mask = np.empty((61))
        mask[:] = False
    mask = np.array(mask)

    data_window = feature_df

    
    
    f = data_window.groupby("channel", sort = False).mean("correlation")
    f = f['correlation']
    
    f = f.reset_index()
    
    info = mne.create_info(ch_names=f['channel'].tolist(), sfreq=1000, ch_types='eeg')
    
   
    data = f['correlation']
    data = stats.zscore(data)
    
    # Load the standard 10-20 montage
    montage = mne.channels.make_standard_montage('biosemi64')
    
    # Extract all positions from the montage
    pos = montage.get_positions()['ch_pos']
    
    # Filter positions to include only those channels in your DataFrame
    pos = {ch: pos[ch] for ch in f['channel'].tolist()}
    
    
    info.set_montage(montage)
  
    

  


    im,cm = mne.viz.plot_topomap(data, pos=info, size = 5,contours = 0,image_interp = "cubic",
                                 mask = mask,
                                 sensors = False,
                        extrapolate="head", ch_type = "eeg", show=False, axes=ax, 
                        mask_params = dict(marker='o', markerfacecolor='white', markeredgecolor='black',
                            markersize=6),
                        cmap = palette, 
    )
    

 
    ax_x_start = 1.2
    ax_x_width = 0.05
    ax_y_start = 0.4
    ax_y_height = 0.6
    cbar_ax = fig.add_axes([ax_x_start, ax_y_start, ax_x_width, ax_y_height])
    clb = fig.colorbar(im, cax=cbar_ax)
    clb.ax.tick_params(labelsize=25)
   
    clb.ax.set_title("Encoding\naccuracy",fontsize=25, pad = 30) # title on top of colorbar
    


def plot_fits_clusters(feature):
    
 
    df = dfl
    k = feature 
    cdict = {"ac_target": "#006D2C","ac_dis":"#54278F", 
            "onsets_target": "#006D2C",  "onsets_dis":"#54278F", 
            "ling_word_target": "#006D2C", "ling_word_dis":"#54278F",
            "ling_phone_target": "#006D2C", "ling_phone_dis":"#54278F"}

    cmap = clr.LinearSegmentedColormap.from_list('custom blue', ['white','white','white', cdict[k], cdict[k]], N=256)
        
    word_target_ar,feature_df = transform_data_nt(k, df)
    
    adj,adjc = create_adjacency(feature_df, time_resolved = False)
    ##perform cluster test
    res = cluster_test_fits_nt(word_target_ar,adj)

  

    montage = mne.channels.make_standard_montage('biosemi64')
    dict_channels = montage.get_positions()['ch_pos']
    
    channels_used = feature_df['channel'].unique()
    chanord = [s for s in dict_channels.keys() if s in channels_used]
    
    dict_channels_used = {key: dict_channels[key] for key in channels_used}


    featn = feature 
    

    create_topo_fits_sig_single(feature_df, res[0], feature, time_resolved = False, palette = cmap)
    
