# 脑磁图 / 脑电图数据的多重变量分析 (解码 / 微波增强)

In [1]:
%matplotlib qt

In [2]:
import pathlib
import matplotlib
import matplotlib.pyplot as plt
import mne

matplotlib.use('Qt5Agg')
mne.set_log_level('Warning')

## 加载分段数据

In [3]:
sample_data_dir = 'F:/Database/Multimodal_data/mne_data/MNE-sample-data/' # 设置已有数据路径
sample_data_dir = pathlib.Path(sample_data_dir)
epochs = mne.read_epochs(pathlib.Path(sample_data_dir / 'out_data') / 'epochs_epo.fif')
epochs.apply_baseline((None, 0))
epochs

0,1
Number of events,320
Events,Auditory/Left: 72 Auditory/Right: 73 Button: 16 Smiley: 15 Visual/Left: 73 Visual/Right: 71
Time range,-0.300 – 0.499 s
Baseline,-0.300 – 0.000 s


## 挑选感兴趣的段落
此处打算分析听觉段落

In [4]:
epochs_auditory = epochs['Auditory']
epochs_auditory 

0,1
Number of events,145
Events,Auditory/Left: 72 Auditory/Right: 73
Time range,-0.300 – 0.499 s
Baseline,-0.300 – 0.000 s


## 计算经验诱发差

In [6]:
evoked_diff = mne.combine_evoked(
    [epochs_auditory['Auditory/Left'].average(),
     epochs_auditory['Auditory/Right'].average()],
     weights=[1, -1] # 左边减去右边，做差值
)

evoked_diff.plot(gfp=True)
mne.viz.plot_compare_evokeds(
    [epochs_auditory['Auditory/Left'].average(),
     epochs_auditory['Auditory/Right'].average(),
     evoked_diff]
)

[<Figure size 800x600 with 1 Axes>,
 <Figure size 800x600 with 1 Axes>,
 <Figure size 800x600 with 1 Axes>]

只是计算诱发电位差还不够，下面进行一些机器学习的方法

## 平衡各分段数目

In [7]:
# 为了使概率水平保持在50%的准确度，我们首先使每个条件下的epoch数量相等。
epochs_auditory.equalize_event_counts(epochs_auditory.event_id)
epochs_auditory

0,1
Number of events,144
Events,Auditory/Left: 72 Auditory/Right: 72
Time range,-0.300 – 0.499 s
Baseline,-0.300 – 0.000 s


## 创建输入`X`和对应的标签`y`

分类器接收矩阵`X`作为输入，并返回向量`y`（由`0`和`1`组成）。这里的`X`将是**所有梯度计上一个时间点的数据**（因此称为多元）。我们想训练我们的模型来区分`听觉/左`和`听觉/右`试验。

我们与所有传感器协同工作，试图在两种条件之间找到一种判别模式，以预测单个试验的实验条件。

对于分类，我们将使用`scikit-learn`包(http://scikit-learn.org/)以及MNE Python函数。

让我们首先创建响应向量`y`。

In [8]:
import numpy as np

# 创建一个长度为试验次数的向量
y = np.empty(len(epochs_auditory.events), dtype=int)

# 找出哪个试次是左声音，哪个试次是右声音
idx_left = epochs_auditory.events[:, 2] == epochs_auditory.event_id['Auditory/Left']
idx_right = epochs_auditory.events[:, 2] == epochs_auditory.event_id['Auditory/Right']

# 将左声音试次标记为0，右声音试次标记为1
y[idx_left] = 0
y[idx_right] = 1

print(y)
print(f'\nSize of y: {y.size}')

[1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1
 0 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0
 1 0 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 1 0 1 0 1 0 1 0 1 0 0 1 0 1 0 1 0
 1 0 1 0 1 0 1 0 1 0 1 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0]

Size of y: 144


现在创建输入矩阵`X`。

我们只想关注这里的梯度计通道，所以我们使用`pick_types（meg='grad'）`。对于磁力计通道，我们需要传递`meg='mag'`；对于EEG通道：`meg=False，eeg=True`。我们创建了一个epochs的副本，因为`pick_types（）`在原地操作，但我们希望保持原始epochs对象不变。

In [9]:
epochs_auditory_grad = epochs_auditory.copy().pick_types(meg='grad')

# 以 NumPy 数组的形式检索数据，数组形状为：(n_trials, n_channels, n_timepoints)
data = epochs_auditory_grad.get_data()
print(data.shape)

(144, 203, 481)


  data = epochs_auditory_grad.get_data()


我们需要重塑数组，以便对于每个试验，我们都有一个向量`[channel_1_time_1，channel_1_time_2，…，channel_m_time_n]`，即我们的目标是将`X`重塑为维度`（n_trials，n_channels*n_timepoints）`。

In [10]:
n_trials = data.shape[0]

X = data.reshape(n_trials, -1)
print(X.shape)

(144, 97643)


## 创建一个分类器

我们将使用简单的`scikit-learn`进行第一轮分类。这是为了证明您可以简单地将MNE的预处理数据输入到`scikit-learn`中。

In [11]:
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

# 分类器管道：对数据进行缩放非常重要
# 在运行实际的分类器之前（在我们的例子中是逻辑回归）。
clf = make_pipeline(StandardScaler(), LogisticRegression())

# 为了避免过拟合，我们使用交叉验证来评估分类器的性能。
# 这里使用不进行数据打乱的交叉验证，即‘块交叉验证’，sklearn中默认不打乱数据，这对我们的实验比较好
n_splits = 5
scoring = 'roc_auc' # 评估指标 
cv = StratifiedKFold(n_splits=n_splits) # 交叉验证策略
scores = cross_val_score(clf, X=X, y=y, cv=cv, scoring=scoring) # 交叉验证

# 交叉验证运行中ROC AUC的平均值和标准差
roc_auc_mean = round(np.mean(scores), 3) # 平均值，保留三位小数
roc_auc_std = round(np.std(scores), 3) # 标准差，保留三位小数

print(f'CV scores: {scores}')
print(f'Mean ROC AUC = {roc_auc_mean:.3f} (STD = {roc_auc_std:.3f})')


CV scores: [0.94285714 0.84761905 0.86666667 0.90952381 0.9744898 ]
Mean ROC AUC = 0.908 (STD = 0.047)


可视化交叉验证结果

In [16]:
fig, ax = plt.subplots()
# 绿色三角表示平均值
ax.boxplot(scores, showmeans=True, whis=(0, 100), labels=['Left vs Right'])
ax.set_ylabel('Score')
ax.set_title('Cross_Validation Scores')

Text(0.5, 1.0, 'Cross_Validation Scores')

## 我们可以使用`mne.decodering`模块更简单地做到这一点。 Let's go. 🚀

In [17]:
from sklearn.pipeline import make_pipeline
from mne.decoding import Scaler, Vectorizer, cross_val_multiscore

# 首先创建X和y
epochs_auditory_grad = epochs_auditory.copy().pick_types(meg='grad')
X = epochs_auditory_grad.get_data()
y = epochs_auditory_grad.events[:, 2]

# 分类器管道
clf = make_pipeline(
    # 正确处理不同通道类型的MNE缩放器
    Scaler(epochs_auditory_grad.info),
    # 使用MNE矢量器就可以不再进行numpy数组变形了
    Vectorizer(),
    # 分类器
    LogisticRegression()
)

# 运行交叉验证
# 注意这里使用的是MNE的cross_val_multiscore()函数，而不是前面用的scikit-learn的cross_val_score()函数。因此只需要设置交叉验证的分割数，MNE就会自动完成剩下的工作。
n_splits = 5
scores = 'roc_auc'
scores = cross_val_multiscore(clf, X, y, cv=5, scoring='roc_auc')

# 计算ROC AUC交叉验证的均值和标准差
roc_auc_mean = round(np.mean(scores), 3)
roc_auc_std = round(np.std(scores), 3)

print(f'CV scores: {scores}')
print(f'Mean ROC AUC= {roc_auc_mean:.3f} (STD = {roc_auc_std:.3f})')

  X = epochs_auditory_grad.get_data()


CV scores: [0.94285714 0.80952381 0.9        0.91428571 0.9744898 ]
Mean ROC AUC= 0.908 (STD = 0.056)


## 随时间解码：每个时间点的比较。

在前面的例子中，我们训练了一个分类器，通过使用**整个试验**的时空模式来区分实验条件。因此，分类器（希望如此！）能够预测哪些激活模式属于哪种条件

In [18]:
from sklearn.preprocessing import StandardScaler
from mne.decoding import SlidingEstimator

# 首先还是创建X和y
epochs_auditory_grad = epochs_auditory.copy().pick_types(meg='grad')
X = epochs_auditory_grad.get_data()
y = epochs_auditory_grad.events[:, 2]

# 分类器管道，但是不需要向量化
clf = make_pipeline(StandardScaler(), LogisticRegression())

# “滑动估计器”将在每个时间点训练分类器
scoring = 'roc_auc'
time_decoder = SlidingEstimator(clf, scoring=scoring, n_jobs=1, verbose=True)

# 运行交叉验证
n_splits = 5
scores = cross_val_multiscore(time_decoder, X, y, cv=5, n_jobs=1)

# 每个时间点的交叉验证平均得分。
mean_scores = np.mean(scores, axis=0)

# 所有时间点的平均得分
mean_across_all_time = round(np.mean(scores), 3)
print(f'\n=> Mean CV scores across all time points: {mean_across_all_time:.3f}')

  X = epochs_auditory_grad.get_data()



=> Mean CV scores across all time points: 0.590


## 绘制出分类结果图

In [19]:
fig, ax = plt.subplots()

ax.axhline(0.5, color='k', linestyle='--', label='chance') # AUC = 0.5
ax.axvline(0, color='k', linestyle='-') # 标记时间点0
ax.plot(epochs.times, mean_scores, label='score')

ax.set_xlabel('Time (s)')
ax.set_ylabel('Mean ROC AUC')
ax.legend()
ax.set_title('Left vs Right')
fig.suptitle('Sensor Space Decoding') 

Text(0.5, 0.98, 'Sensor Space Decoding')

TODO:暂时还不太明白最后的分类结果图的具体含义，但是可以看出，从时间0之后，平均分数明显有提高，这可能说明了采取的刺激有效果（存疑）