In [1]:
import numpy as np
import mne
import os
from scipy.spatial.distance import cdist

In [2]:


def get_neighbors_within_radius(pos:list, radius:float) -> list:
    """
    获取通道位置后输入该函数，设定sphere半径
    计算在sphere中的channel的索引并返回

    Parameters
    ----------
    pos: 所有通道的三维坐标
    radius: searchlight sphere半径


    Returns
    -------

    neighbors: 每个sphere内的mag索引组成的嵌套列表
    """
    dist_mat = cdist(pos, pos)
    neighbors = [np.where(dist_mat[i] <= radius)[0].tolist() for i in range(len(pos))]

    return neighbors

def get_chanpo_fif(subj:int, radius: float = .04):
    """
    通过被试的epoch信息，提取以每个channel为中心的sphere内mag、grad索引

    Parameters
    ----------
    subj: 单一被试索引
    radius: searchlight sphere半径，默认值0.04

    Returns
    -------
    mag_idx_idx: list, 每个sphere内的mag索引组成的嵌套列表
    grad_idx_idx: list, 每个sphere内的grad索引组成的嵌套列表

    """
    datadir = r'../data'
    sub_fif_dir = r'cleaned/unconscious-session_1-block_1-epo.fif'
    epo_fif_dir = os.path.join(datadir,f'subject{subj}',sub_fif_dir)
    epo_fif = mne.read_epochs(epo_fif_dir, preload=True)

    # 提取磁强计索引
    mag_idx = mne.pick_types(epo_fif.info, meg='mag')

    # 提取梯度计索引
    grad_idx = mne.pick_types(epo_fif.info, meg='grad')

    # 获取磁强计的位置
    mag_pos = [epo_fif.info['chs'][i]['loc'][:3] for i in mag_idx]

    # 获取梯度计的位置
    grad_pos = [epo_fif.info['chs'][i]['loc'][:3] for i in grad_idx]


    mag_idx_idx = get_neighbors_within_radius(mag_pos, radius)

    # 判断sphere中是否只有中心点
    for idx in range( len( mag_idx_idx ) ):
        if len( mag_idx_idx[idx] )  <2:
            raise ValueError("radius值过小")

    grad_idx_idx = np.arange(len(grad_pos)).reshape(-1,2)
    grad_idx_idx = [grad_idx_idx[item].flatten() for item in mag_idx_idx]

    # 判断sphere中是否只有中心点
    for idx in range( len( grad_idx_idx ) ) :
        if len( grad_idx_idx[idx] ) <4:
            raise ValueError("radius值过小")

    return mag_idx_idx, grad_idx_idx

In [4]:
mag_idx_idx, grad_idx_idx = get_chanpo_fif(17,radius = .04)


Reading D:\Archive\Github\COSN-Hackathon2025\scripts\..\data\subject17\cleaned\unconscious-session_1-block_1-epo.fif ...
    Found the data of interest:
        t =   -1000.00 ...     998.00 ms
        0 CTF compensation matrices available
Not setting metadata
98 matching events found
No baseline correction applied
0 projection items activated


In [5]:
mag_idx_idx

[[0, 1, 2, 3],
 [0, 1, 2, 11],
 [0, 1, 2, 3, 4, 54],
 [0, 2, 3, 54, 57],
 [2, 4, 5, 7, 11],
 [4, 5, 6, 9, 12],
 [5, 6, 7, 15, 59],
 [4, 6, 7, 54, 58],
 [8, 9, 11, 16],
 [5, 8, 9, 10, 11, 19],
 [9, 10, 12, 23],
 [1, 4, 8, 9, 11],
 [5, 10, 12, 13, 15],
 [12, 13, 14, 22, 23],
 [13, 14, 15, 24, 67],
 [6, 12, 14, 15, 66],
 [8, 16, 17, 19],
 [16, 17, 18, 28],
 [17, 18, 19, 20, 29],
 [9, 16, 18, 19],
 [18, 20, 23, 34],
 [21, 22, 23, 34, 36, 37],
 [13, 21, 22, 23, 24, 37],
 [10, 13, 20, 21, 22, 23],
 [14, 22, 24, 25, 27],
 [24, 25, 26, 37, 41],
 [25, 26, 27, 82, 85],
 [24, 26, 27, 67, 68],
 [17, 28, 29, 30],
 [18, 28, 29, 33, 34],
 [28, 30, 31, 33],
 [30, 31, 32, 42],
 [31, 32, 33, 44],
 [29, 30, 32, 33, 35],
 [20, 21, 29, 34, 35],
 [33, 34, 35, 36],
 [21, 35, 36, 37, 38, 45],
 [21, 22, 25, 36, 37, 38],
 [36, 37, 38, 39, 41],
 [38, 39, 40, 45, 46],
 [39, 40, 41, 49, 83],
 [25, 38, 40, 41, 82],
 [31, 42, 43, 44],
 [42, 43, 44, 47, 50],
 [32, 42, 43, 44, 45, 46],
 [36, 39, 44, 45],
 [39, 44, 46,

In [150]:
grad_idx_idx

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([ 0,  1,  2,  3,  4,  5, 22, 23]),
 array([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9, 108, 109]),
 array([  0,   1,   4,   5,   6,   7, 108, 109, 114, 115]),
 array([ 4,  5,  8,  9, 10, 11, 14, 15, 22, 23]),
 array([ 8,  9, 10, 11, 12, 13, 18, 19, 24, 25]),
 array([ 10,  11,  12,  13,  14,  15,  30,  31, 118, 119]),
 array([  8,   9,  12,  13,  14,  15, 108, 109, 116, 117]),
 array([16, 17, 18, 19, 22, 23, 32, 33]),
 array([10, 11, 16, 17, 18, 19, 20, 21, 22, 23, 38, 39]),
 array([18, 19, 20, 21, 24, 25, 46, 47]),
 array([ 2,  3,  8,  9, 16, 17, 18, 19, 22, 23]),
 array([10, 11, 20, 21, 24, 25, 26, 27, 30, 31]),
 array([24, 25, 26, 27, 28, 29, 44, 45, 46, 47]),
 array([ 26,  27,  28,  29,  30,  31,  48,  49, 134, 135]),
 array([ 12,  13,  24,  25,  28,  29,  30,  31, 132, 133]),
 array([16, 17, 32, 33, 34, 35, 38, 39]),
 array([32, 33, 34, 35, 36, 37, 56, 57]),
 array([34, 35, 36, 37, 38, 39, 40, 41, 58, 59]),
 array([18, 19, 32, 33, 36, 

In [113]:
# mag_idx_idx
grad_idx_idx

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([ 0,  1,  2,  3,  4,  5, 22, 23]),
 array([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9, 108, 109]),
 array([  0,   1,   4,   5,   6,   7, 108, 109, 114, 115]),
 array([ 4,  5,  8,  9, 10, 11, 14, 15, 22, 23]),
 array([ 8,  9, 10, 11, 12, 13, 18, 19, 24, 25]),
 array([ 10,  11,  12,  13,  14,  15,  30,  31, 118, 119]),
 array([  8,   9,  12,  13,  14,  15, 108, 109, 116, 117]),
 array([16, 17, 18, 19, 22, 23, 32, 33]),
 array([10, 11, 16, 17, 18, 19, 20, 21, 22, 23, 38, 39]),
 array([18, 19, 20, 21, 24, 25, 46, 47]),
 array([ 2,  3,  8,  9, 16, 17, 18, 19, 22, 23]),
 array([10, 11, 20, 21, 24, 25, 26, 27, 30, 31]),
 array([24, 25, 26, 27, 28, 29, 44, 45, 46, 47]),
 array([ 26,  27,  28,  29,  30,  31,  48,  49, 134, 135]),
 array([ 12,  13,  24,  25,  28,  29,  30,  31, 132, 133]),
 array([16, 17, 32, 33, 34, 35, 38, 39]),
 array([32, 33, 34, 35, 36, 37, 56, 57]),
 array([34, 35, 36, 37, 38, 39, 40, 41, 58, 59]),
 array([18, 19, 32, 33, 36, 

In [155]:
get_chanpo_fif(17,0.01)

Reading D:\Archive\Github\COSN-Hackathon2025\scripts\..\data\subject17\cleaned\unconscious-session_1-block_1-epo.fif ...
    Found the data of interest:
        t =   -1000.00 ...     998.00 ms
        0 CTF compensation matrices available
Not setting metadata
98 matching events found
No baseline correction applied
0 projection items activated


ValueError: radius值过小

In [67]:
subj = 17
datadir = r'../data'
sub_fif_dir = r'cleaned/unconscious-session_1-block_1-epo.fif'
epo_fif_dir = os.path.join(datadir,f'subject{subj}',sub_fif_dir)
epo_fif = mne.read_epochs(epo_fif_dir, preload=True)
grad = epo_fif.copy().pick(picks='grad')
print(grad.ch_names[:10])
one_grad = grad.copy().pick_channels([grad.ch_names[0]])
two_grad = grad.copy().pick_channels([grad.ch_names[1]])
# grad: 只包含梯度计
# epo_fif: 仍然保留所有通道


Reading D:\Archive\Github\COSN-Hackathon2025\scripts\..\data\subject17\cleaned\unconscious-session_1-block_1-epo.fif ...
    Found the data of interest:
        t =   -1000.00 ...     998.00 ms
        0 CTF compensation matrices available
Not setting metadata
98 matching events found
No baseline correction applied
0 projection items activated
['MEG0112', 'MEG0113', 'MEG0122', 'MEG0123', 'MEG0132', 'MEG0133', 'MEG0142', 'MEG0143', 'MEG0212', 'MEG0213']
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).


In [70]:
%matplotlib notebook


a=one_grad.get_data()
a.shape
b=two_grad.get_data()

print(a),print(''),

[[[ 3.74903831e-13  5.11195977e-13  8.95155441e-14 ... -4.85940974e-12
   -4.35796780e-12 -3.73933384e-12]]

 [[-2.49354241e-13  9.54754991e-13  1.68761499e-12 ...  3.50732404e-12
    1.56638656e-12 -7.56091536e-13]]

 [[-4.53442671e-12 -4.26807586e-12 -3.81078214e-12 ...  2.29370293e-12
    1.17673961e-12  1.50123421e-12]]

 ...

 [[-1.35120045e-12 -6.82906153e-13 -3.89456515e-13 ... -2.40964053e-12
   -4.14541373e-12 -4.27106052e-12]]

 [[ 4.84782477e-12  3.82662639e-12  2.77745036e-12 ...  4.46088249e-12
    6.15130515e-12  6.99256408e-12]]

 [[-3.74349954e-12 -3.81857726e-12 -3.89465007e-12 ...  2.15885758e-12
    1.54377039e-12 -9.06362062e-13]]]

[[[ 1.22292595e-12  8.83904649e-13  4.67807853e-13 ... -8.99858510e-13
   -1.85559393e-12 -4.08110303e-12]]

 [[-2.40165323e-12 -6.55412089e-14  2.63233812e-12 ... -7.20774822e-12
   -9.10750724e-12 -8.09188805e-12]]

 [[-3.61754800e-12 -7.74894116e-12 -9.40038732e-12 ...  1.42417405e-12
    5.78762505e-14  5.07014248e-13]]

 ...

 [[ 5.

(None, None, None)

In [71]:
print(b)

[[[ 1.22292595e-12  8.83904649e-13  4.67807853e-13 ... -8.99858510e-13
   -1.85559393e-12 -4.08110303e-12]]

 [[-2.40165323e-12 -6.55412089e-14  2.63233812e-12 ... -7.20774822e-12
   -9.10750724e-12 -8.09188805e-12]]

 [[-3.61754800e-12 -7.74894116e-12 -9.40038732e-12 ...  1.42417405e-12
    5.78762505e-14  5.07014248e-13]]

 ...

 [[ 5.05721354e-14 -3.74621104e-13  5.92791343e-13 ... -3.59850397e-12
   -8.08261655e-12 -1.15494588e-11]]

 [[ 5.83196512e-12  6.32443485e-12  5.94517010e-12 ...  1.42069276e-12
   -1.64087409e-14 -6.44987183e-13]]

 [[-1.27937446e-12 -4.77889594e-12 -7.00073080e-12 ... -2.74804890e-12
   -2.54332186e-12  1.65195875e-13]]]


In [73]:
print(np.shape(grad.ch_names))

(204,)


In [9]:
# -*- coding: utf-8 -*-
"""
Created on Sat Aug 23 2025

@author: 黄若杰、宋奕德、朱江岳、郝守彬、梅宁

下面的函数主要用于MEG searchlight temporal decoding任务中

"""
from glob import glob

import numpy as np
import mne
import itertools
import os
import re
import pandas as pd
from sklearn.decomposition import PCA
from scipy.spatial.distance import cdist
from sklearn.calibration import CalibratedClassifierCV
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import BaggingClassifier, RandomForestClassifier
from sklearn.feature_selection import SelectFromModel, SelectPercentile, mutual_info_classif, VarianceThreshold
from sklearn.model_selection import StratifiedShuffleSplit, cross_validate
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier

from joblib import Parallel, delayed

from typing import Any


def get_neighbors_within_radius(pos, radius: float, ) -> list[
    int | list[int] | list[list[int]] | list[list[list[Any]]]]:
    dist_mat = cdist(pos, pos)
    neighbors = [np.where(dist_mat[i] <= radius)[0].tolist() for i in range(len(pos))]
    return neighbors


def get_channel_position(example_epochs, radius: float = 0.04):
    """
    通过被试的epoch信息，提取以每个channel为中心的sphere内mag、grad索引

    Parameters
    ----------
    example_epochs: 被试的epochs数据
    radius: searchlight sphere半径，默认值0.04

    Returns
    -------
    mag_idx_idx: list, 每个sphere内的mag索引组成的嵌套列表
    grad_idx_idx: list, 每个sphere内的grad索引组成的嵌套列表

    """

    # 提取磁强计索引
    mag_idx = mne.pick_types(example_epochs.info, meg='mag')

    # 提取梯度计索引
    grad_idx = mne.pick_types(example_epochs.info, meg='grad')

    # 获取磁强计的位置
    mag_pos = [example_epochs.info['chs'][i]['loc'][:3] for i in mag_idx]

    # 获取梯度计的位置
    grad_pos = [example_epochs.info['chs'][i]['loc'][:3] for i in grad_idx]



    mag_idx_idx = get_neighbors_within_radius(mag_pos, radius)
    # 判断sphere中是否只有中心点
    for idx in range( len( mag_idx_idx ) ):
        if len( mag_idx_idx[idx] )  <2:
            raise ValueError("radius值过小")


    grad_idx_idx = np.arange(len(grad_pos)).reshape(-1, 2)
    grad_idx_idx = [grad_idx_idx[item].flatten() for item in mag_idx_idx]

    # 判断sphere中是否只有中心点
    for idx in range( len( grad_idx_idx ) ) :
        if len( grad_idx_idx[idx] ) <4:
            raise ValueError("radius值过小")

    return mag_idx_idx, grad_idx_idx


def str2int(x):
    if type(x) is str:
        return int(re.findall(r'\d+', x)[0])
    else:
        return x


def build_model_dictionary(model_name: str = 'None + Linear-SVM',
                           class_weight: str = 'balanced',
                           remove_invariant: bool = True,
                           scaler=None,
                           l1: bool = True,
                           C: float = 1,
                           tol: float = 1e-3,
                           ):
    """
    Parameters
    ----------
    model_name : str
        DESCRIPTION. The default is 'None + Linear-SVM', which means no feature extraction and linear SVM as decoder.
    class_weight : str, optional
        DESCRIPTION. The default is 'balanced'.
    remove_invariant : bool, optional
        DESCRIPTION. The default is True.
    scaler : TYPE, optional
        Defult scaler is always Standard Scaler if it is set to None. The default is None.
    l1 : bool, optional
        DESCRIPTION. The default is True.
    C : float, optional
        DESCRIPTION. The default is 1.
    tol : float, optional
        DESCRIPTION. The default is 1e-2.

    Returns
    -------
    sklearn.pipeline
        DESCRIPTION.

    """

    np.random.seed(12345)

    xgb = RandomForestClassifier(random_state=12345)
    RF = SelectFromModel(xgb,
                         prefit=False,
                         threshold='median'  # induce sparsity
                         )
    uni = SelectPercentile(score_func=mutual_info_classif,
                           percentile=50,
                           )  # so annoying that I cannot control the random state

    pipeline = []

    if remove_invariant:
        pipeline.append(VarianceThreshold())

    if scaler == None:
        scaler = StandardScaler()
    pipeline.append(scaler)

    feature_extractor, decoder = model_name.split(' + ')

    if feature_extractor == 'PCA':
        pipeline.append(PCA(n_components=.90,
                            random_state=12345))
        # l1 = False
    elif feature_extractor == 'Mutual':
        pipeline.append(uni)
        # l1 = False
    elif feature_extractor == 'RandomForest':
        pipeline.append(RF)
        # l1 = False

    if l1:
        svm = LinearSVC(penalty='l1',  # not default
                        dual=False,  # not default
                        tol=tol,  # not default
                        random_state=12345,  # not default
                        max_iter=int(1e4),  # default
                        class_weight=class_weight,  # not default
                        C=C,
                        )
    else:
        svm = LinearSVC(penalty='l2',  # default
                        dual=True,  # default
                        tol=tol,  # not default
                        random_state=12345,  # not default
                        max_iter=int(1e4),  # default
                        class_weight=class_weight,  # not default
                        C=C,
                        )
    svm = CalibratedClassifierCV(estimator=svm,
                                 method='sigmoid',
                                 cv=8)

    bagging = BaggingClassifier(estimator=svm,
                                n_estimators=30,  # not default
                                max_features=0.9,  # not default
                                max_samples=0.9,  # not default
                                bootstrap=True,  # default
                                bootstrap_features=True,  # default
                                random_state=12345,  # not default
                                )
    knn = KNeighborsClassifier()
    tree = DecisionTreeClassifier(random_state=12345,
                                  class_weight=class_weight)
    dummy = DummyClassifier(strategy='uniform', random_state=12345, )

    if decoder == 'Linear-SVM':
        pipeline.append(svm)
    elif decoder == 'Dummy':
        pipeline.append(dummy)
    elif decoder == 'KNN':
        pipeline.append(knn)
    elif decoder == 'Tree':
        pipeline.append(tree)
    elif decoder == 'Bagging':
        pipeline.append(bagging)

    return make_pipeline(*pipeline)

def get_neighbors_idx(epochs, radius:float):
    mag_idx_idx, grad_idx_idx = get_channel_position(epochs, radius)
    time = np.arange(epochs.times.shape[0])
    maga_for_loop_mag = list(itertools.product(mag_idx_idx, time))
    maga_for_loop_grad = list(itertools.product(grad_idx_idx, time))
    return maga_for_loop_mag, maga_for_loop_grad


def load_data(working_data, working_beha, radius: float = .04):
    """
    Load MEG data and behavioral data for a given subject.

    Parameters
    ----------
    subj : int
        Subject number to load the data for.
    radius : float, optional
        Radius for neighborhood search in the MEG data. Default is 0.04.

    Returns
    -------
    maga_for_loop : list
        List of tuples containing magnetic sensor indices and time points for decoding.
    epochs_unconscious : mne.Epochs
        MEG epochs data for unconscious trials.
    epochs_conscious : mne.Epochs
        MEG epochs data for conscious trials.
    df_unconscious : pandas.DataFrame
        Behavioral data for unconscious trials.
    df_conscious : pandas.DataFrame
        Behavioral data for conscious trials.
    """


    epochs = []
    df = []
    for epoch_file, beha_file in zip(working_data, working_beha):
        epochs.append(mne.read_epochs(epoch_file, preload=True, verbose=False))
        df_temp = pd.read_csv(beha_file)
        df.append(df_temp)
    epochs = mne.concatenate_epochs(epochs)
    #     epochs.resample(100)
    df = pd.concat(df)
    df["visible.keys_raw"] = df["visible.keys_raw"].apply(str2int)
    idx_unconscious = df['visible.keys_raw'] == 1
    idx_conscious = df['visible.keys_raw'] == 3
    epochs_unconscious = epochs[idx_unconscious]
    df_unconscious = df[idx_unconscious]
    epochs_conscious = epochs[idx_conscious]
    df_conscious = df[idx_conscious]

    mag_idx_idx, grad_idx_idx = get_channel_position(epochs, radius)
    time = np.arange(epochs.times.shape[0])
    maga_for_loop = list(itertools.product(mag_idx_idx, time))
    del epochs
    return maga_for_loop, epochs_unconscious, epochs_conscious, df_unconscious, df_conscious


#     if mag & grad:
#
#         mag_data=epochs.copy().pick(picks='mag').pick(mag_idx_idx[0])
#         grad_data=epochs.copy().pick(picks='grad').pick(grad_idx_idx[0])
#
#
#
#
#     mag_data=epochs.copy().pick(picks='mag').pick(mag_idx_idx[0])
#
label_map = {"Nonliving_Things": 1,
             "Living_Things": 0,
             }


def decode_within_sphere(epoch_data, labels, cv, model_name='None + Linear-SVM'):
    """
        Perform decoding within a sphere using a specified machine learning model.

        Parameters
        ----------
        epoch_data : ndarray
            The MEG data to be used for decoding. Shape is typically (n_samples, n_features).
        labels : ndarray
            The labels corresponding to the data samples.
        cv : sklearn.model_selection._BaseKFold
            Cross-validation splitting strategy.
        model_name : str, optional
            The name of the model to use for decoding. Default is 'None + Linear-SVM'.
            The format is 'FeatureExtractor + Decoder', where the feature extractor and decoder
            are specified as strings.

        Returns
        -------
        test_scores : ndarray
            The cross-validated test scores for the decoding task.
        """
    pipeline = build_model_dictionary(model_name=model_name)
    res = cross_validate(pipeline, epoch_data, labels, cv=cv, scoring='roc_auc', n_jobs=1)
    return res['test_score']



if __name__ == '__main__':
    subj = 9
    subject = 'subject' + str(subj)
    maga_for_loop, epochs_unconscious, epochs_conscious, df_unconscious, df_conscious = load_data(9, .04)
    labels_unconscious = df_unconscious['category'].map(label_map).values
    labels_conscious = df_conscious['category'].map(label_map).values

    cv = StratifiedShuffleSplit(n_splits=20, test_size=0.2, random_state=12345)

    scores = Parallel(n_jobs=2, verbose=1, )(
        delayed(decode_within_sphere)(**{'epoch_data': epochs_unconscious.get_data()[:, sphere, time_point],
                                         'labels': labels_unconscious,
                                         'cv': cv, }) for sphere, time_point in maga_for_loop)
    results_dir = os.path.join('../results/', 'MEG', subject)
    if not os.path.exists(results_dir):
        os.makedirs(results_dir)
    np.save(os.path.join(results_dir, 'searchlight_scores.npy'), scores)



Not setting metadata
1000 matching events found
Applying baseline correction (mode: mean)


[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done  46 tasks      | elapsed:   15.1s


KeyboardInterrupt: 