# 第七章 MNE-Python处理脑电/脑磁数据

In [None]:
"""
本代码为《Python心理学应用》一书第七章MNE-Python处理脑电/脑磁数据中的全部代码
建议结合书本内容进行实操练习，以增进理解
魏楚光，weicg@psych.ac.cn 
8-Dec-2024
"""

### 本章内容基于 MNE-Python 1.9 版本，所有示例代码均使用 Jupyter Notebook 进行演示和说明

## 代码1.1

In [None]:
python -m ensurepip --upgrade # 确保系统已经安装了 Python 和 pip
pip install mne # 安装 MNE-Python

## 代码1.2（需在Anaconda里运行）

In [None]:
conda update --name=base conda  # update conda
conda —version

## 代码1.4

In [None]:
import mne
mne.sys_info()

### 代码1.4

In [None]:
import mne
mne.sys_info()

### 代码2.1

In [None]:
import mne
# 加载 .fif 格式数据
raw = mne.io.read_raw_fif(r'C:\weicg\example_raw.fif', preload=True)
# 加载 .edf 格式数据
raw = mne.io.read_raw_edf(r'C:\weicg\example_raw.edf', preload=True)
# 加载 .set 格式数据
raw = mne.io.read_raw_eeglab(r'C:\weicg\example_raw.set', preload=True)
# 加载 .vhdr/.eeg 数据
raw = mne.io.read_raw_brainvision(r'C:\weicg\example_raw.vhdr', preload=True)
# 加载 .cnt数据
raw = mne.io.read_raw_cnt(r'C:\weicg\example_raw.cnt')

### 代码2.2

In [None]:
###### Read raw fif format data ######
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw_fif(raw_fif_file, preload=True)
raw_fif

### 代码2.3

In [None]:
print(raw_fif.info)  # 打印所有元信息
print(raw_fif.info['sfreq'])  # 获取采样频率
print(raw_fif.info['bads'])  # 获取坏导
print(raw_fif.info['ch_names'])  # 以列表形式获取通道名称
print(raw_fif.info['ch_names'][0])  # 获取第一个通道的详细信息
print(raw_fif.info['chs'][0])  # 获取第一个通道的详细信息

### 代码2.4

In [None]:
data, times = raw_fif[:]  # 从 Raw 对象中提取数据矩阵和时间点数组
print(f'data,{data}')  # 输出数据矩阵
print(f'times,{times}')  # 输出时间点数组

### 代码2.5

In [None]:
c  # 设置 matplotlib 的后端为 Qt5Agg，以支持交互式绘图
import mne

# 读取数据
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw_fif(raw_fif_file, preload=True)
# 绘制原始数据
raw_fif.plot()

### 代码2.6

In [None]:
builtin_montages = mne.channels.get_builtin_montages(descriptions=True) 
for montage_name, montage_description in builtin_montages:
    print(f"{montage_name}: {montage_description}") 

### 代码2.7

In [None]:
# 加载标准 10-20 系统的 Montage
montage_1020 = mne.channels.make_standard_montage('standard_1020')
print(montage_1020.ch_names)  # 输出所有电极的名称
print(len(montage_1020.ch_names))  # 94，表示 Montage 中共有 94 个电极
montage_1020.plot()  # 绘制 2D 电极位置图
montage_1020.plot(kind='3d')  # 绘制 3D 电极位置图

### 代码2.8

In [None]:
raw_cnt_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\cnt\sub1.cnt"
raw_cnt = mne.io.read_raw_cnt(raw_cnt_file)

# 为 Raw 对象设置 Montage（电极位置布置）
raw_cnt.set_montage(montage='standard_1020',match_case=False,on_missing='ignore')
raw_cnt.plot_sensors(show_names=True)  # 可视化呈现

### 代码2.9

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw(raw_file)
raw_fif.plot_sensors(ch_type='eeg',show_names=True)

### 代码2.10

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw(raw_fif_file)
fif_events = mne.find_events(raw_fif)
print(fif_events)

### 代码2.11

In [None]:
raw_cnt_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\cnt\sub1.cnt"
raw_cnt = mne.io.read_raw_cnt(raw_cnt_file)
annotation_events,annotation_events_id=mne.events_from_annotations(raw_cnt)
print(f'annotation_events{annotation_events}')
print(f'annotation_events_id{annotation_events_id}')

### 代码2.12

In [None]:
event_id = {
	'conditionA_level1':1,
	'conditionA_level2':2,
	'response':99
}
print(event_id )

### 代码2.13

In [None]:
events_stim = mne.pick_events(fif_events, exclude=32)
print(events_stim)

### 代码2.14

In [None]:
fif_events = mne.find_events(raw_fif)  # 提取事件信息，返回一个形状为 (n_events, 3) 的数组
fif_events  # 事件数组，查看所有事件信息

# 查看事件数组的形状，返回 (n_events, 3)，表示有 n_events 个事件，每个事件包含 3 个值
print(fif_events.shape)  # (319, 3)

# 查看第一个事件的信息，返回一个包含 3 个值的数组：[采样点, 前一个触发值, 当前触发值]
print(fif_events[0]) 

# 查看最后一个事件的信息
print(fif_events[318]) 

# 提取所有事件的采样点（即每个事件的第一列）
print(fif_events[:, 0])  # 所有事件的采样点

# 提取所有事件（即每个事件的第三列）
print(fif_events[:, 2])

print(set(fif_events[:, 2]))  # 使用 set() 函数提取所有唯一的事件

# 也可以使用 numpy 的 unique() 函数提取所有唯一的事件触发值
import numpy as np
print(np.unique(fif_events[:, 2]))  # 所有唯一的事件触发值

# 判断每个事件的触发值是否等于 1，返回一个布尔数组
print(fif_events[:, 2] == 1 )

# 筛选出所有触发值为 1 的事件
print(fif_events[events[:, 2] == 1])  # 选择 event 1 的所有事件

# 筛选出触发值为 1 的事件的采样点（第一列）
print(fif_events[events[:, 2] == 1][:, 0]  )

# 计算触发值为 1 的事件出现的次数
print(len(fif_events[events[:, 2] == 1]))

### 代码2.15

In [None]:
# 从原始的FIF格式数据中提取事件信息
fif_events = mne.find_events(raw_fif) 

# 定义事件ID与实验条件的映射关系
event_id = {
    'Auditory/Left': 1,   # 听觉/左侧刺激，对应事件ID为1
    'Auditory/Right': 2,  # 听觉/右侧刺激，对应事件ID为2
    'Visual/Left': 3,     # 视觉/左侧刺激，对应事件ID为3
    'Visual/Right': 4,   # 视觉/右侧刺激，对应事件ID为4
    'Smiley': 5,         # 笑脸刺激，对应事件ID为5
    'Button': 32         # 按钮触发，对应事件ID为32
}

# 绘制事件的时间分布图（以样本点为单位）
mne.viz.plot_events(fif_events)

# 绘制事件的时间分布图，并根据event_id映射显示事件名称（以样本点为单位）
mne.viz.plot_events(fif_events, event_id=event_id)  # 示例

# 绘制事件的时间分布图，并根据event_id映射显示事件名称，同时将时间单位转换为秒
mne.viz.plot_events(
    fif_events, 
    event_id=event_id, 
    sfreq=raw.info['sfreq'],  # 使用数据的采样频率
    first_samp=raw_fif.first_samp  # 使用数据的起始样本点
)  # 时间单位为秒

### 代码3.1

In [None]:
bad_chan = raw_fif.info['bads']
print(bad_chan)  # 输出坏通道 ['MEG 2443', 'EEG 053']
bad_chan = raw_fif.info['bads'] =  ['MEG 2443', 'EEG 053', 'EEG 054'] # 手动标记坏通道
print(bad_chan)  # 输出坏通道 ['MEG 2443', 'EEG 053', 'EEG 054']

### 代码3.2

In [None]:
from mne.preprocessing import find_bad_channels_lof
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw_fif(raw_file, preload=True)
# 检测坏通道
bad_channels = find_bad_channels_lof(raw_fif, picks="eeg")
print("检测到的坏通道：", bad_channels)
# 将坏通道添加到 raw.info['bads']
raw_fif.info['bads'] = bad_channels

### 代码3.3

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw(raw_file)
print(raw_fif.info['bads'])  # 输出['MEG 2443', 'EEG 053']
raw_fif.drop_channels(raw_fif.info['bads'])
print(raw_fif.info['bads'])  # 输出[]

### 代码3.4

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif= mne.io.read_raw(raw_file)
raw_fif.load_data()
raw_fif.interpolate_bads(reset_bads=True)
print(raw_fif.info['bads'])  # 输出[]

### 代码3.5

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif= mne.io.read_raw(raw_file)

#此处 raw 是经过坏通道插值处理的 Raw 对象，即滤波处理在坏通道处理之后
raw_fif.load_data() # 调用 load_data() 将数据从磁盘加载到内存中以便后续处理
raw_fif.interpolate_bads(reset_bads=True)

# 使用 raw.filter() 方法对数据进行带通滤波，保留 0.1 Hz 到 40 Hz 之间的频率成分
# - l_freq=0.1：设置高通滤波的截止频率为 0.1 Hz，去除低频漂移（如直流偏移或运动伪迹）
# - h_freq=40：设置低通滤波的截止频率为 40 Hz，去除高频噪声（如肌电干扰或工频噪声）
raw_fif.filter(l_freq=0.1, h_freq=40)

### 代码3.6

In [None]:
cnt_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\cnt\sub1.cnt"
raw_cnt=mne.io.read_raw_cnt(cnt_file)

# 并去除不需要的通道
cnt_bad_ch = ['HEO', 'VEO']
raw_cnt.drop_channels(cnt_bad_ch)

# 加载数据并滤波
raw_cnt.load_data()
raw_cnt.filter(l_freq=0.1, h_freq=40)


# 采用 M2作为参考，确保 M2 通道存在于数据中，否则代码会抛出错误
raw_cnt.set_eeg_reference(ref_channels=['M2']) # 也可指定其它通道或通道组合作为参考

### 代码3.7

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif= mne.io.read_raw(raw_file)

#此处 raw 是经过坏通道插值处理的 Raw 对象，即滤波处理在坏通道处理之后
raw_fif.load_data() # 调用 load_data() 将数据从磁盘加载到内存中以便后续处理
raw_fif.interpolate_bads(reset_bads=True)

# 使用 raw.filter() 方法对数据进行带通滤波，保留 0.1 Hz 到 40 Hz 之间的频率成分
# - l_freq=0.1：设置高通滤波的截止频率为 0.1 Hz，去除低频漂移（如直流偏移或运动伪迹）
# - h_freq=40：设置低通滤波的截止频率为 40 Hz，去除高频噪声（如肌电干扰或工频噪声）
raw_fif.filter(l_freq=0.1, h_freq=40)
mne.set_eeg_reference(raw_fif, ref_channels='average')

### 代码3.8

In [None]:
cnt_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\cnt\sub1.cnt"
raw_cnt=mne.io.read_raw_cnt(cnt_file)

# 并去除不需要的通道
cnt_bad_ch = ['HEO', 'VEO']
raw_cnt.drop_channels(cnt_bad_ch)

# 加载数据并滤波
raw_cnt.load_data()
raw_cnt.filter(l_freq=0.1, h_freq=40)


# 采用 F3 和 F4 作为双极参考
mne.set_bipolar_reference(raw_cnt, anode=["F3"], cathode=["F4"])


### 代码3.9

In [None]:
# 导入 MNE-Python 中的 ICA 模块
from mne.preprocessing import ICA

# 创建 ICA 对象，设置参数：
# n_components=30：提取 30 个独立成分
# method='fastica'：使用 FastICA 算法
# random_state=42：设置随机种子以确保结果可重复
ica = ICA(n_components=30, method='fastica', random_state=42)

# 拟合数据，此处是对经过坏道插值、滤波和重参考处理的数据 raw 进行 ICA 分解
# picks='eeg'：仅对 EEG 通道进行 ICA 分解
ica.fit(raw_fif, picks='eeg')

### 代码3.10

In [None]:
# 可视化所有独立成分的空间分布图
ica.plot_components()

### 代码3.11

In [None]:
# 可视化独立成分的时间序列（源信号）
ica.plot_sources(raw_fif)

### 代码3.12

In [None]:
# 查看特定独立成分的详细属性（如时间序列、频谱、空间分布等）
# picks=[0, 1]：查看第 0 和第 1 个独立成分
ica.plot_properties(raw_fif, picks=[0, 1])

### 代码3.13

In [None]:
# 标记需要排除的伪迹成分（如眼动、肌电伪迹等）
# 排除第 0、1、2、3 共4个独立成分
ica.exclude = [0, 1, 2, 3] 

# 可视化 ICA 去除伪迹后的效果
# exclude=ica.exclude：指定需要排除的成分
# picks='eeg'：仅显示 EEG 通道
ica.plot_overlay(raw_fif, exclude=ica.exclude, picks='eeg')

### 代码3.14

In [None]:
# 应用 ICA 去除伪迹，生成去伪迹后的数据
raw_fif_clean = ica.apply(raw_fif)

### 代码3.15

In [None]:
reject = dict(grad=4000e-13,  # 单位：T / m（梯度计）
              mag=4e-12,      # 单位：T（磁力计）
              eeg=40e-6,      # 单位：V（EEG 通道）
              eog=250e-6      # 单位：V（EOG 通道）
              )

### 代码3.16

In [None]:
epochs = mne.Epochs(raw_fif_clean, fif_events, event_id, tmin=-0.1, tmax=0.5, baseline=(None,
         0),  picks=['eeg'], reject=dict(eeg=120e-6), preload=True)

epochs.plot(events=fif_events,event_id=event_id)

### 代码3.17

In [None]:
# 生成伪事件，每个段 2 秒，无重叠
artificial_events = mne.make_fixed_length_events(raw_fif_clean, start=0, stop=None, duration=2.0)
artificial_epochs = mne.Epochs(raw_fif_clean, artificial_events, tmin=0, tmax=2, baseline=None)
artificial_epochs.plot()

### 代码3.18

In [None]:
import os
raw_data_path = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif"
raw_path = raw_data_path
all_raw_files = os.listdir(raw_data_path)
all_raw_files

In [None]:
import mne

#####################################
# 1. 导入原始数据
# 使用 CNT 格式数据路径
raw_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\cnt\sub1.cnt"

# 读取数据并加载到内存
raw = mne.io.read_raw_cnt(raw_file, preload=True)
print(raw.info)  # 查看数据的基本信息

#####################################
# 2. 设置电极布局 (Montage)
# 加载标准 1020 电极布局并设置到 raw 对象
montage = mne.channels.make_standard_montage('standard_1020')
raw.set_montage(montage,match_case=False,on_missing='ignore')
raw.plot_sensors(kind='topomap', show_names=True)  # 查看电极分布

#####################################
# 3. 检查和标记坏通道
# 可视化信号以手动标记坏通道
raw.plot()

# 手动设置无用通道或坏通道（示例）
raw.info['bads'] = ['HEO','VEO','M2']  # 此处为无用通道
print(f"无用通道: {raw.info['bads']}")

# 此处为丢弃无用通道，如果是坏通道建议插值修复
raw.drop_channels(['HEO','VEO','M2'])  # 丢弃无用通道


#####################################
# 4. 滤波
# 设置高通滤波和低通滤波 (例如，1-40 Hz)
raw.filter(l_freq=1.0, h_freq=40.0, fir_design='firwin')

#####################################
# 5. 重参考
# 设置平均参考
raw.set_eeg_reference('average', projection=True)

#####################################
# 6. 提取事件信息
# 从注释中提取事件
events, event_dict = mne.events_from_annotations(raw)
print("事件信息:", events)
print("事件字典:", event_dict)

#####################################
# 7. ICA 识别眼电等伪迹
# 创建 ICA 对象并拟合数据
ica = mne.preprocessing.ICA(n_components=30, random_state=97)
ica.fit(raw)

# 查看 ICA 成分
ica.plot_components()

# 找到伪影成分并排除
ica.exclude = []  # 根据图形结果设置伪影成分索引
ica.apply(raw)

#####################################
# 8. 提取数据段 (Epochs)
# 设置时间窗口和基线校正
epochs = mne.Epochs(
    raw, events, event_id=event_dict, tmin=-0.1, tmax=0.5, baseline=(None, 0),
    detrend=1, preload=True
)

#####################################
# 9. 数据保存
# 保存预处理后的数据
epochs_path = 'ICA_raw.fif'
epochs.save(epochs_path, overwrite=True)
print(f"预处理后的数据已保存到: {epochs_path}")

### 代码4.1

In [None]:
s1_epochs=mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\s1_epoch.fif")
s1_epochs.info

### 代码4.2

In [None]:
fif_epochs=mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")
auditory_epochs=fif_epochs['Auditory']
auditory_epochs

### 代码4.3

In [None]:
fif_epochs=mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")
# 从 epochs 中提取条件为 'Auditory/Right' 的所有 epochs
right_auditory_epochs=fif_epochs['Auditory/Right'] 
right_auditory_epochs

### 代码4.4

In [None]:
fif_epochs=mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")
# 从 epochs 中提取条件为 'Auditory/Right' 的所有 epochs
right_auditory_epochs=fif_epochs['Auditory/Right'] 

# 从 epochs 中提取前 25 个 epochs（索引 0 到 24），忽略事件条件，仅根据索引筛选
epochs[:25]

# 从 right_auditory_epochs 中提取索引 1 到 29 的 epochs，步长为 2
right_auditory_epochs[1:30:2] 


### 代码4.5

In [None]:
metadata_epochs = mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\metadata-epo.fif")
metadata_epochs.metadata

### 代码5.1

In [None]:
right_auditory_epochs = fif_epochs['Auditory/Right']
right_auditory_evoked = right_auditory_epochs.average()

### 代码5.2

In [None]:
right_auditory_evoked 

### 代码5.3

In [None]:
right_auditory_evoked_data = right_auditory_evoked.get_data()
print(f"get_data{right_auditory_evoked_data}") # 输出数据
print(f"data_shape{right_auditory_evoked_data.shape}") # 输出数组维度属性

### 代码5.4

In [None]:
fif_epochs=mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")
# 从 epochs 中提取条件为 'Auditory/Right' 的所有 epochs
right_auditory_epochs=fif_epochs['Auditory/Right'] 
left_auditory_epochs=fif_epochs['Auditory/Left'] 

right_auditory_evoked = right_auditory_epochs.average()
left_auditory_evoked = left_auditory_epochs.average()
# 使用mne.grand_average()
evoked_auditory_avg = mne.grand_average([right_auditory_evoked,left_auditory_evoked])

# 使用mne.combine_evoked()，每个evoked对象的权重为1/2，结果与上面一致
evoked_auditory_avg_equal = mne.combine_evoked([right_auditory_evoked,left_auditory_evoked], weights='equal')

# 使用mne.combine_evoked()计算差异波,right_auditory_evoked*1 + left_auditory_evoked*(-1) 
evoked_auditory_diff = mne.combine_evoked([right_auditory_evoked,left_auditory_evoked], weights=[1,-1]) 

### 代码5.5

In [None]:
right_auditory_evoked.plot(picks='eeg') 

### 代码5.6

In [None]:
right_auditory_evoked.plot_topomap(ch_type='eeg') 

### 代码5.7

In [None]:
right_auditory_evoked.plot_joint(title='plot_joint')

### 代码5.8

In [None]:
# 获取 right_auditory_evoked 对象中所有通道的名称
all_channel_names = right_auditory_evoked.ch_names

# 定义左侧和右侧 ROI（感兴趣区域）的通道名称列表
left = ['EEG 025', 'EEG 026', 'EEG 027']  # 左侧通道
right = ['EEG 033', 'EEG 034', 'EEG 035']  # 右侧通道

# 使用 mne.pick_channels 函数根据通道名称获取对应的索引
# left_idx 是左侧通道名称对应的索引列表
left_idx = mne.pick_channels(all_channel_names, include=left)
# right_idx 是右侧通道名称对应的索引列表
right_idx = mne.pick_channels(all_channel_names, include=right)

# 创建一个字典 roi_dict，将左右侧通道索引分别命名为 'left_ROI' 和 'right_ROI'
# 字典的键是 ROI 的名称，值是对应的通道索引列表
roi_dict = dict(left_ROI=left_idx, right_ROI=right_idx)

# 使用 mne.channels.combine_channels 函数将左右侧 ROI 的通道数据分别进行平均
# right_auditory_evoked 是输入数据，roi_dict 定义了分组，method="mean" 表示使用平均值作为聚合方法
# 返回的 roi_evoked 是一个新的 Evoked 对象，包含两个虚拟通道：'left_ROI' 和 'right_ROI'
roi_evoked = mne.channels.combine_channels(right_auditory_evoked, roi_dict, method="mean")

# 绘制 roi_evoked 的数据
roi_evoked.plot(titles='roi_evoked')

### 代码5.9

In [None]:
import mne
import matplotlib.pyplot as plt

# 读取 epochs 数据
epochs = mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")

# 提取听觉右刺激（Auditory/Right）对应的 trials 并计算平均
right_auditory_evoked = epochs['Auditory/Right'].average()

# 提取视觉右刺激（Visual/Right）对应的 trials 并计算平均
right_visual_evoked = epochs['Visual/Right'].average()

# 将两种条件的 Evoked 数据存储在字典中
evokeds = dict(auditory=right_auditory_evoked, visual=right_visual_evoked)

# 选择要绘制的 EEG 通道
picks = [f"EEG 0{n}" for n in range(10, 15)]

# 设置全局字体大小
plt.rcParams.update({'font.size': 14})

# 使用 mne.viz.plot_compare_evokeds 函数绘制两种条件的比较图
# evokeds 是包含两种条件的 Evoked 数据的字典
# picks 指定要绘制的通道
# combine="mean" 表示对选定的通道进行均值聚合，绘制一条平均曲线
# 绘制 evoked 数据比较图
mne.viz.plot_compare_evokeds(evokeds, title='Right Auditory VS Right Visual',invert_y=True,
                             picks=picks, combine='mean', time_unit='ms')

In [None]:
os.getcwd()

In [None]:
import mne
import matplotlib.pyplot as plt

# 读取 epochs 数据
epochs = mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")

# 提取听觉右刺激（Auditory/Right）对应的 trials 并计算平均
right_auditory_evoked = epochs['Auditory/Right'].average()

# 提取视觉右刺激（Visual/Right）对应的 trials 并计算平均
right_visual_evoked = epochs['Visual/Right'].average()

# 将两种条件的 Evoked 数据存储在字典中
evokeds = dict(auditory=right_auditory_evoked, visual=right_visual_evoked)

# 选择要绘制的 EEG 通道
picks = [f"EEG 0{n}" for n in range(10, 15)]

# 设置全局字体大小
plt.rcParams.update({'font.size': 14})

# 使用 mne.viz.plot_compare_evokeds 函数绘制两种条件的比较图
# evokeds 是包含两种条件的 Evoked 数据的字典
# picks 指定要绘制的通道
# combine="mean" 表示对选定的通道进行均值聚合，绘制一条平均曲线
# 绘制 evoked 数据比较图
mne.viz.plot_compare_evokeds(evokeds, title='Right Auditory VS Right Visual',invert_y=True,
                             picks=picks, combine='mean', time_unit='ms')


### 代码6.1

In [None]:
raw_fif_file = r"C:\weicg\Desktop\Python心理学应用\raw_data\fif\sub1.fif"
raw_fif = mne.io.read_raw(raw_fif_file)
raw_compute_psd = raw_fif.compute_psd()
print(f"Raw数据：{raw_compute_psd}\n")
epoch_spectrum = epochs.compute_psd()
print(f"Epoch数据：{epoch_spectrum}")

### 代码6.2

In [None]:
epochs = mne.read_epochs(r"C:\weicg\Desktop\Python心理学应用\epoch\fif_epoch.fif")
epoch_spectrum = epochs.compute_psd()
psds, freqs = epoch_spectrum.get_data(return_freqs=True)
print(f"PSDs shape: {psds.shape}, freqs shape: {freqs.shape}\n")
right_auditory_evoked = right_auditory_epochs.average()
evoked_spectrum = right_auditory_evoked .compute_psd()
print(f"evoked_spectrum shape: {evoked_spectrum.shape}")

### 代码6.3

In [None]:
evoked_spectrum.plot(picks="data", exclude="bads", amplitude=False)

### 代码6.4

In [None]:
evoked_spectrum.plot_topo(color="k", fig_facecolor="w", axis_facecolor="w")

### 代码6.5

In [None]:
evoked_spectrum.plot_topomap(show_names=True)

### 代码7.1

In [None]:
# 使用 np.logspace 生成在对数尺度上均匀分布的频率点
# 参数说明：
# - np.log10([6, 35])：将 6 和 35 转换为对数尺度
# - num=8：生成 8 个频率点
# 结果 freqs 是一个包含 8 个频率值的数组，范围从 6 Hz 到 35 Hz
import numpy as np
freqs = np.logspace(*np.log10([6, 35]), num=8)
# 设置频率范围和时间步长
n_cycles = freqs / 4.0  # 周期数

print(f"freqs:{freqs}")
print(f"n_cycles:{n_cycles}")

# 使用 compute_tfr 方法计算时频表示（Time-Frequency Representation, TFR）
# 参数说明：
# - method="morlet"：使用 Morlet 小波进行时频分析
# - freqs=freqs：指定分析的频率范围
# - n_cycles=n_cycles：指定每个频率对应的小波周期数
# - average=True：对所有 trials 进行平均，返回平均功率
# - return_itc=True：返回跨 trial 的相位一致性（ITC）
# - decim=3：对时间维度进行降采样，保留每 3 个时间点中的一个

power, itc = epochs.compute_tfr(
    method="morlet",
    freqs=freqs,
    n_cycles=n_cycles,
    average=True,
    return_itc=True,
    decim=3,
)

# 返回结果：
# - power：时频功率谱，表示信号在不同频率和时间点上的能量分布
# - itc：跨 trial 的相位一致性，表示信号在不同频率和时间点上的相位稳定性
print(f"power:{power}")
print(f"itc:{itc}")

### 代码7.2

In [None]:
# 使用 plot_topo 方法绘制所有通道的时频地形图
# 参数说明：
# - baseline=(-0.5, 0)：指定基线校正的时间范围为 -0.5 秒到 0 秒
# - mode="logratio"：使用对数比率（logratio）进行基线校正
# - title="Average power"：设置图的标题为 "Average power"
power.plot_topo(baseline=(-0.5, 0), mode="logratio", title="Average power")

# 使用 plot 方法绘制特定通道的时频图
# 参数说明：
# - picks=[56]：选择索引为 56 的通道进行绘图
# - baseline=(-0.5, 0)：指定基线校正的时间范围为 -0.5 秒到 0 秒
# - mode="logratio"：使用对数比率（logratio）进行基线校正
# - title=power.ch_names[56]：将图的标题设置为该通道的名称
power.plot(picks=[56], baseline=(-0.5, 0), mode="logratio", title=power.ch_names[56])

### 代码8.1

In [None]:
# 导入必要的Python库
import mne  # 用于处理和分析脑电（EEG）和脑磁（MEG）数据的库
import os  # 用于处理文件路径和操作系统相关功能
import numpy as np  # 用于数值计算和数组操作
import scipy.stats as stats  # 用于统计分析和假设检验
import matplotlib.pyplot as plt  # 用于数据可视化
# 从MNE库中导入FDR（False Discovery Rate）校正方法，用于多重比较校正
from mne.stats import fdr_correction 

# 定义数据文件夹的路径
path = r'C:\weicg\Desktop\Python心理学应用\epoch\group_epoch'  # 将 'your_folder_path' 替换为实际存储数据的文件夹路径

# 定义被试编号列表
subjects = [1, 2, 5, 6, 9, 10]  # 包含6名被试的编号

# 根据被试编号生成对应的文件名列表
# 使用列表推导式生成文件名，格式为 's1_epoch.fif', 's2_epoch.fif', ..., 's10_epoch.fif'
files = ['s' + str(x) + '_epoch.fif' for x in subjects]

# 将文件夹路径与文件名结合，生成完整的文件路径列表
# 使用 os.path.join 将路径和文件名拼接，确保路径格式正确
subject_files = [os.path.join(path, file) for file in files]

### 代码8.2

In [None]:
# 定义事件条件
condition_1_events = ['11', '12', '13', '14']  # 条件1的事件编号
condition_2_events = ['31', '32', '33', '34']  # 条件2的事件编号
# 这些事件编号通常对应于实验设计中的不同刺激或任务条件，用于后续提取特定条件下的EEG数据。

# 用字典regions定义6个兴趣区（ROI）及其对应的通道名称
regions = {
    "Left Frontal": ['F3', 'F7', 'FC5'],  # 左前额区，包含F3、F7和FC5通道
    "Left Central": ['C3', 'CP3', 'CP5'],  # 左中央区，包含C3、CP3和CP5通道
    "Left Posterior": ['P3', 'P7', 'O1'],  # 左后部区，包含P3、P7和O1通道
    "Right Frontal": ['F4', 'F8', 'FC6'],  # 右前额区，包含F4、F8和FC6通道
    "Right Central": ['C4', 'CP4', 'CP6'],  # 右中央区，包含C4、CP4和CP6通道
    "Right Posterior": ['P4', 'P8', 'O2']  # 右后部区，包含P4、P8和O2通道
}
# 兴趣区的划分基于电极的解剖学位置，便于分析神经活动的空间分布特征。

# 设定统计分析的时间窗口（200-400毫秒）
tmin_stats = 0.2  # 时间窗口的起点（200毫秒）
tmax_stats = 0.4  # 时间窗口的终点（400毫秒）

### 代码8.3

In [None]:
# 存储 evoked 数据用于绘图
# 创建两个字典，分别存储条件1和条件2在每个兴趣区的 evoked 数据
evokeds_condition_1 = {region: [] for region in regions.keys()}
evokeds_condition_2 = {region: [] for region in regions.keys()}

# 存储每个被试在每个兴趣区的平均电位数据
# 创建一个嵌套字典，用于存储每个兴趣区在条件1和条件2下的平均波幅数据
data_per_region = {region: {"Condition 1": [], "Condition 2": []} for region in regions.keys()}

# 循环加载每个被试的数据
for subj_file in subject_files:
    # 读取当前被试的 epochs 数据
    epochs = mne.read_epochs(subj_file)

    # 计算条件1和条件2的 evoked 数据（即平均 ERP）
    evoked_cond1 = epochs[condition_1_events].average()  # 条件1的平均 ERP
    evoked_cond2 = epochs[condition_2_events].average()  # 条件2的平均 ERP

    # 获取每个兴趣区的通道索引
    # 将通道名称转换为索引，方便后续提取数据
    channel_indices = {region: [epochs.ch_names.index(ch) for ch in channels] for region, channels in regions.items()}

    # 遍历每个兴趣区
    for region, indices in channel_indices.items():
        # 使用 combine_channels() 方法计算该兴趣区的平均 ERP
        combined_evoked_cond1 = mne.channels.combine_channels(
            evoked_cond1, groups={region: indices}, method="mean"  # 对条件1的 ERP 进行平均
        )
        combined_evoked_cond2 = mne.channels.combine_channels(
            evoked_cond2, groups={region: indices}, method="mean"  # 对条件2的 ERP 进行平均
        )

        # 将计算后的 evoked 数据存储到对应的字典中，供后续绘图使用
        evokeds_condition_1[region].append(combined_evoked_cond1)
        evokeds_condition_2[region].append(combined_evoked_cond2)

        # 提取该兴趣区在 200-400ms 时间窗口内的平均波幅
        # 使用 copy() 创建数据的副本，避免修改原始数据
        avg_1 = combined_evoked_cond1.copy().crop(tmin=tmin_stats, tmax=tmax_stats).data.mean()  # 条件1的平均波幅
        avg_2 = combined_evoked_cond2.copy().crop(tmin=tmin_stats, tmax=tmax_stats).data.mean()  # 条件2的平均波幅

        # 将平均波幅数据存储到 data_per_region 字典中
        data_per_region[region]["Condition 1"].append(avg_1)
        data_per_region[region]["Condition 2"].append(avg_2)

### 代码8.4

In [None]:
# 初始化存储 t 值和 p 值的字典
t_values = {}  # 用于存储每个兴趣区的 t 值
p_values = {}  # 用于存储每个兴趣区的 p 值

# 统计分析：计算每个兴趣区的 t 值和 p 值
for region in regions.keys():
    # 对条件1和条件2的平均波幅数据进行配对 t 检验
    t_stat, p_val = stats.ttest_rel(
        data_per_region[region]["Condition 1"],  # 条件1的平均波幅数据
        data_per_region[region]["Condition 2"]   # 条件2的平均波幅数据
    )
    # 将 t 值和 p 值存储到对应的字典中
    t_values[region] = t_stat
    p_values[region] = p_val

# FDR 校正
# 将 p 值从字典中提取为列表
p_array = list(p_values.values())
# 使用 FDR 校正方法对 p 值进行多重比较校正
# alpha=0.05 表示显著性水平为 0.05
# method='indep' 表示假设检验之间是独立的
_, p_fdr_corrected = fdr_correction(p_array, alpha=0.05, method='indep')

# 打印校正前后的 t 值和 p 值
print("原始 t 值: ", t_values)  # 打印未校正的 t 值
print("原始 p 值: ", p_values)  # 打印未校正的 p 值
print("FDR 校正后的 p 值: ", dict(zip(regions.keys(), p_fdr_corrected)))  # 打印 FDR 校正后的 p 值

### 代码8.5

In [None]:
# 计算所有被试的平均 evoked 数据（用于绘图）
# 使用 mne.grand_average() 对每个兴趣区的 evoked 数据进行平均
grand_evoked_condition_1 = {region: mne.grand_average(evokeds_condition_1[region]) for region in regions.keys()}  # 条件1的平均 evoked
grand_evoked_condition_2 = {region: mne.grand_average(evokeds_condition_2[region]) for region in regions.keys()}  # 条件2的平均 evoked

# 创建 2×3 子图布局（2行3列，6个脑区）
fig, axes = plt.subplots(2, 3, figsize=(15, 8))  # 创建一个包含6个子图的画布

# 脑区列表
region_list = list(regions.keys())  # 获取所有兴趣区的名称

# 遍历脑区，在对应的子图上绘制
for i, region in enumerate(region_list):
    row, col = divmod(i, 3)  # 计算当前脑区对应的子图位置（行和列）
    ax = axes[row, col]  # 获取当前子图的轴对象

    # 绘制条件1和条件2的 ERP 对比图
    mne.viz.plot_compare_evokeds(
        {"Condition 1": grand_evoked_condition_1[region],  # 条件1的平均 ERP
         "Condition 2": grand_evoked_condition_2[region]},  # 条件2的平均 ERP
        title=region,  # 设置子图标题为当前脑区名称
        show=False,  # 不立即显示图形
        axes=ax,  # 指定绘图的子图
        truncate_xaxis=False  # 不截断 x 轴
    )

    # 高亮分析时间窗口（200-400ms）
    ax.axvspan(tmin_stats, tmax_stats, color='yellow', alpha=0.3)  # 在时间窗口内添加黄色半透明背景

    # 设置 y 轴标签，仅在第一列显示
    if col == 0:
        ax.set_ylabel("Amplitude (µV)")  # 设置 y 轴标签为 "Amplitude (µV)"

# 设置整个图的 x 轴和 y 轴标签
fig.supxlabel("Time (s)")  # 设置整个图的 x 轴标签为 "Time (s)"
fig.supylabel("Amplitude (µV)")  # 设置整个图的 y 轴标签为 "Amplitude (µV)"
fig.suptitle("ERP Comparison Across Regions", fontsize=14, fontweight="bold")  # 设置整个图的标题

# 调整布局
plt.tight_layout()  # 自动调整子图间距，避免重叠
plt.show()  # 显示图形

### 代码8.6

In [None]:
# 导入必要的库
import mne  # 用于处理和分析EEG/MEG数据
import os  # 用于处理文件路径
import numpy as np  # 用于数值计算
from mne.channels import find_ch_adjacency  # 用于查找电极的邻接关系
from mne.stats import combine_adjacency, spatio_temporal_cluster_test  # 用于统计分析和簇检验
from mne.viz import plot_compare_evokeds  # 用于可视化ERP数据
from mne.time_frequency import tfr_multitaper  # 用于时频分析
from mne.stats import spatio_temporal_cluster_1samp_test  # 用于单样本簇检验
from mpl_toolkits.axes_grid1 import make_axes_locatable  # 用于调整子图布局
import matplotlib  # 用于绘图
matplotlib.use('Qt5Agg')  # 设置matplotlib的后端
import matplotlib.pyplot as plt  # 用于绘图

# 加载预处理后的epochs数据
path = r'C:\weicg\weicg\Desktop\data'  # 数据存储路径
subjects = [2, 3, 4, 5]  # 被试编号
files = ['s' + str(x) + '_epoch.fif' for x in subjects]  # 生成文件名列表
file_paths = [os.path.join(path, file) for file in files]  # 生成完整文件路径列表

# 存储每个被试的epochs数据
epochs_list = []

# 加载每个被试的epochs数据
for file_path in file_paths:
    epochs = mne.read_epochs(file_path)  # 读取epochs数据
    epochs_list.append(epochs)  # 将epochs数据添加到列表中

# 设置频率范围和时间步长
frequencies = np.arange(4, 30, 3)  # 频率范围：4-30 Hz，步长为3 Hz
n_cycles = frequencies / 4.0  # 每个频率对应的周期数

# 存储每个被试在每个条件下的平均时频数据
avg_power_list_cond1 = []  # 条件1的平均时频数据
avg_power_list_cond2 = []  # 条件2的平均时频数据

# 计算每个被试的时频数据
for epochs in epochs_list:
    # 计算条件1的时频数据
    power_cond1 = epochs[['11', '12']].compute_tfr(
        method="morlet",  # 使用Morlet小波
        freqs=frequencies,  # 频率范围
        n_cycles=n_cycles,  # 周期数
        average=False,  # 不进行平均
        return_itc=False,  # 不返回ITC
    )
    # 计算条件2的时频数据
    power_cond2 = epochs[['21', '22']].compute_tfr(
        method="morlet",
        freqs=frequencies,
        n_cycles=n_cycles,
        average=False,
        return_itc=False,
    )
    
    # 对时频数据进行平均
    avg_power_cond1 = power_cond1.data.mean(axis=0, keepdims=True)  # 条件1的平均时频数据
    avg_power_cond2 = power_cond2.data.mean(axis=0, keepdims=True)  # 条件2的平均时频数据
    
    # 将平均时频数据添加到列表中
    avg_power_list_cond1.append(avg_power_cond1)
    avg_power_list_cond2.append(avg_power_cond2)

# 提取两个条件的时频数据
X_condition1 = np.array([power.data for power in avg_power_list_cond1])  # 条件1的时频数据
X_condition2 = np.array([power.data for power in avg_power_list_cond2])  # 条件2的时频数据
X_condition1 = np.squeeze(X_condition1)  # 去除多余的维度
X_condition2 = np.squeeze(X_condition2)  # 去除多余的维度

# 确保数据形状正确 (n_subjects, n_times, n_freqs, n_channels)
X_condition1 = np.transpose(X_condition1, (0, 3, 2, 1))  # 调整维度顺序
X_condition2 = np.transpose(X_condition2, (0, 3, 2, 1))  # 调整维度顺序

# 计算两个条件的时频数据的差异
diff_mean = X_condition1.mean(axis=0) - X_condition2.mean(axis=0)  # 平均差异
diff = X_condition1 - X_condition2  # 每个被试的差异

# 进行簇的置换检验
threshold = 3  # 簇形成的阈值
adjacency, ch_names = find_ch_adjacency(epochs.info, ch_type="eeg")  # 查找电极的邻接关系
tfr_adjacency = combine_adjacency(len(power_cond1.times), len(frequencies), adjacency)  # 组合时间和频率的邻接关系

# 执行簇的置换检验
cluster_stats = spatio_temporal_cluster_1samp_test(
    diff,  # 差异数据
    n_permutations=1000,  # 置换次数
    threshold=threshold,  # 阈值
    tail=0,  # 双尾检验
    n_jobs=4,  # 并行计算
    buffer_size=None,  # 缓冲区大小
    adjacency=tfr_adjacency,  # 邻接关系
)

# 提取检验结果
T_obs, clusters, p_values, _ = cluster_stats
# 筛选显著的簇（本例仅为示意，故设定p值阈值为0.5，以获得显著的簇）
good_cluster_inds = np.where(p_values < 0.5)[0]
print(p_values)  # 输出p_values
print(good_cluster_inds)  # 显著的簇

# 可视化显著的簇
for i_clu, clu_idx in enumerate(good_cluster_inds):
    # 提取簇的时间、频率和空间索引
    time_inds, freq_inds, space_inds = clusters[clu_idx]
    ch_inds = np.unique(space_inds)  # 显著通道索引
    time_inds = np.unique(time_inds)  # 显著时间点索引
    freq_inds = np.unique(freq_inds)  # 显著频率索引

    # 计算显著簇的T值地形图
    T_map = T_obs[:, freq_inds, :].mean(axis=1)  # 平均频率维度
    T_map = T_map[time_inds, :].mean(axis=0)  # 平均时间维度

    # 获取显著时间点
    sig_times = epochs.times[time_inds]

    # 初始化图形
    fig, ax_topo = plt.subplots(1, 1, figsize=(10, 3), layout="constrained")

    # 创建空间掩码
    mask = np.zeros((T_map.shape[0], 1), dtype=bool)
    mask[ch_inds, :] = True  # 标记显著通道

    # 绘制显著簇的地形图
    T_evoked = mne.EvokedArray(T_map[:, np.newaxis], epochs.info, tmin=0)
    T_evoked.plot_topomap(
        times=0,
        mask=mask,
        axes=ax_topo,
        cmap="Reds",
        vlim=(np.min, np.max),
        show=False,
        colorbar=False,
        mask_params=dict(markersize=10),
    )
    image = ax_topo.images[0]

    # 创建额外的轴（用于ERF和颜色条）
    divider = make_axes_locatable(ax_topo)

    # 添加颜色条
    ax_colorbar = divider.append_axes("right", size="5%", pad=0.05)
    plt.colorbar(image, cax=ax_colorbar)
    ax_topo.set_xlabel(
        "Averaged T-map ({:0.3f} - {:0.3f} s)".format(*sig_times[[0, -1]])
    )

    # 移除默认标题
    ax_topo.set_title("")

    # 添加新的轴用于频谱图
    ax_spec = divider.append_axes("right", size="300%", pad=1.2)
    title = f"Cluster #{i_clu + 1}, {len(ch_inds)} spectrogram"
    if len(ch_inds) > 1:
        title += " (max over channels)"
    T_obs_plot = T_obs[..., ch_inds].max(axis=-1)  # 显著通道的最大T值

    # 绘制频谱图
    for T_image, cmap in zip([T_obs_plot], ["autumn"]):
        c = ax_spec.imshow(
            T_image,
            cmap=cmap,
            aspect="auto",
            origin="lower",
            extent=[epochs.times[0], epochs.times[-1], frequencies[0], frequencies[-1]],
        )
    ax_spec.set_xlabel("Time (ms)")
    ax_spec.set_ylabel("Frequency (Hz)")
    ax_spec.set_title(title)

    # 添加第二个颜色条
    ax_colorbar2 = divider.append_axes("right", size="5%", pad=0.05)
    plt.colorbar(c, cax=ax_colorbar2)
    ax_colorbar2.set_ylabel("T-stat")

# 显示图形
plt.show()