In [7]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
from pathlib import Path
from IPython.display import Audio
import IPython.display as ipd
from scipy.io import wavfile
import tempfile
import os
import librosa
import pandas as pd
import seaborn as sns
import h5py
import mne
from scipy.stats import zscore
from mne_bids import BIDSPath, read_raw_bids
from matplotlib_venn import venn2,venn2_circles
from tqdm import tqdm

In [8]:
cm = 1/2.54
plt.rcParams['svg.fonttype'] = 'none'

fontdict = dict(fontsize=7)
fontsize = 7

red = '#A9373B'
blue = '#2369BD'
orange = '#CC8963'
green = '#009944'

stg_color = '#20B2AA'
smc_color = '#6A5ACD'
insula_color = '#D4AF37'

reds = sns.light_palette(red, as_cmap=True)
blues = sns.light_palette(blue, as_cmap=True)
oranges = sns.light_palette(orange, as_cmap=True)
greens = sns.light_palette(green, as_cmap=True)

recon_dir = '/cwork/ns458/ECoG_Recon/'
mne.viz.set_3d_backend('notebook')                    # MNE 3D in-notebook static backend
# text svg

Using notebook 3d backend.


# Production

In [9]:
import pandas as pd
import numpy as np
from mne_bids import BIDSPath
from tqdm import tqdm

# 1. 加载所有 time 数据
task = ['PhonemeSequence', 'SentenceRep', 'TIMIT', 'LexicalDelay']
ref = 'bipolar'

time_paths = []
for t in task:
    time_paths.extend(
        BIDSPath(
            root=f'../results/{t}({ref})',
            datatype='HGA',
            suffix='time',
            check=False,
        ).match()
    )

# 2. 加载数据
time_dfs = []
for path in tqdm(time_paths):
    df = pd.read_csv(path)
    time_dfs.append(df)

time_data = pd.concat(time_dfs)

# rename both phase : Resp and Response to Resp
time_data.loc[time_data.phase == 'Resp', 'phase'] = 'Response'
time_data.loc[time_data.phase == 'Response', 'phase'] = 'Response'

# rename phase of perception to audio
time_data.loc[time_data.description == 'passive', 'phase'] = 'Audio'
time_data.loc[time_data.description == 'perception', 'phase'] = 'Audio'
time_data.loc[time_data.description == 'production', 'phase'] = 'Response'

time_data = time_data[time_data.phase=='Response']

# rename ROIs
time_data.loc[time_data.roi == 'INS', 'roi'] = 'Insula'
time_data.loc[time_data.roi == 'HG', 'roi'] = 'STG'
time_data.loc[time_data.roi == 'PrG', 'roi'] = 'SMC'
time_data.loc[time_data.roi == 'PoG', 'roi'] = 'SMC'
time_data.loc[time_data.roi == 'Subcentral', 'roi'] = 'SMC'

# 只保留 time >= -0.5
speak_data = time_data[(time_data['time'] <= 1) & 
                        (time_data['time'] >= -1)]

print(f"筛选后数据量: {len(speak_data)}")
print(f"description 分布: {speak_data.description.unique()}")
print(f"phase 分布: {speak_data.phase.unique()}")
print(f"task 分布: {speak_data.task.unique()}")

# 4. groupby 计算 peak HGA
speak_hga = speak_data.groupby('channel').agg({
    'value': 'mean', 
    'label': 'first', 
    'x': 'first',
    'y': 'first', 
    'z': 'first',
    'roi': 'first',
    'hemi': 'first',
    'subject': 'first',
    'description': 'first',
    'phase':'first',
    'task':'first'
}).reset_index()

speak_hga.rename(columns={'value': 'HGA'}, inplace=True)
speak_hga['HGA'] = speak_hga['HGA'].astype(float)

speak_hga.head()

100%|██████████| 975/975 [00:35<00:00, 27.59it/s]


筛选后数据量: 1855899
description 分布: ['production' 'LS' 'Decision' 'Repeat']
phase 分布: ['Response']
task 分布: ['PhonemeSequence' 'SentenceRep' 'LexicalDelay']


Unnamed: 0,channel,HGA,label,x,y,z,roi,hemi,subject,description,phase,task
0,D0005_LTG14-15,0.317243,ctx_lh_G_temp_sup-Lateral,-68.886726,-28.295294,0.243498,STG,L,D0005,LS,Response,SentenceRep
1,D0005_LTG15-16,0.231774,ctx_lh_G_temp_sup-Lateral,-69.63974,-38.151283,4.609248,STG,L,D0005,LS,Response,SentenceRep
2,D0005_LTG24-26,-0.04925,ctx_lh_G_temporal_inf,-57.666666,-21.463809,-25.843526,ITG,L,D0005,LS,Response,SentenceRep
3,D0005_LTG29-30,0.123241,ctx_lh_G_temporal_inf,-54.534528,-29.265764,-28.07843,ITG,L,D0005,LS,Response,SentenceRep
4,D0005_LTG30-31,0.16612,ctx_lh_G_temporal_inf,-55.770381,-38.61574,-23.875112,ITG,L,D0005,LS,Response,SentenceRep


In [11]:
# 3D Insula HGA Video Generation
import numpy as np
import pyvista as pv
import matplotlib.pyplot as plt
from mne.viz import Brain
from scipy.spatial import cKDTree
import matplotlib.colors as mcolors
from tqdm import tqdm
import imageio.v2 as imageio
from io import BytesIO
from PIL import Image

# 读取aparc.a2009s labels
labels = mne.read_labels_from_annot(
    subject='fsaverage', parc='aparc.a2009s',  
    hemi='both', subjects_dir=recon_dir
)

# Insula相关的labels
insula_patterns = ['G_insular_short', 'G_Ins_lg_and_S_cent_ins', 
                   'S_circular_insula_ant', 'S_circular_insula_inf', 'S_circular_insula_sup']

# 加载pial表面用于投影
lh_pial_coords, _ = mne.read_surface(f"{recon_dir}/fsaverage/surf/lh.pial")
rh_pial_coords, _ = mne.read_surface(f"{recon_dir}/fsaverage/surf/rh.pial")
lh_tree = cKDTree(lh_pial_coords)
rh_tree = cKDTree(rh_pial_coords)

# 计算insula区域的中心点
def get_insula_center(labels, hemi, pial_coords):
    vertices = []
    for lab in labels:
        if lab.hemi == hemi and any(p in lab.name for p in insula_patterns):
            vertices.extend(lab.vertices)
    if vertices:
        return pial_coords[vertices].mean(axis=0)
    return None

lh_insula_center = get_insula_center(labels, 'lh', lh_pial_coords)
rh_insula_center = get_insula_center(labels, 'rh', rh_pial_coords)

# 筛选insula数据
insula_data = speak_data[speak_data['roi'] == 'Insula'].copy()
print(f"Insula electrodes: {insula_data['channel'].nunique()}")

# 参数设置
window_size = 0.1  # 100ms window
step_size = 0.05   # 50ms step
time_min, time_max = insula_data['time'].min(), insula_data['time'].max()
time_points = np.arange(time_min, time_max - window_size + step_size, step_size)

# 固定的vmin, vmax (across all windows)
all_values = insula_data['value'].values
vmin, vmax = 0, np.percentile(all_values[all_values > 0], 95)
print(f"Fixed colormap range: [{vmin:.3f}, {vmax:.3f}]")

# 点大小映射参数
size_min, size_max = 8, 25
cmap = reds

def hga_to_size(hga, vmin, vmax, size_min, size_max):
    normalized = np.clip((hga - vmin) / (vmax - vmin), 0, 1)
    return size_min + normalized * (size_max - size_min)

def generate_frame(t_start, insula_data, lh_tree, rh_tree, lh_pial_coords, rh_pial_coords,
                   labels, insula_patterns, lh_insula_center, rh_insula_center,
                   cmap, vmin, vmax, size_min, size_max, window_size, time_min, time_max):
    """生成单帧图像"""
    t_end = t_start + window_size
    
    # 计算当前窗口的HGA
    window_data = insula_data[(insula_data['time'] >= t_start) & (insula_data['time'] < t_end)]
    hga_df = window_data.groupby('channel').agg({
        'value': 'mean',
        'x': 'first', 'y': 'first', 'z': 'first'
    }).reset_index()
    
    if len(hga_df) == 0:
        return None
    
    cord = hga_df[['x', 'y', 'z']].values
    hga_values = hga_df['value'].values
    mask_lh = cord[:, 0] < 0
    mask_rh = cord[:, 0] > 0
    
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
    
    # 创建Brain对象
    lh_brain = Brain("fsaverage", subjects_dir=recon_dir, surf="pial",
                     hemi="lh", background="white", show=False,
                     cortex=(0.9, 0.9, 0.9), alpha=0.05, size=(800, 800))
    rh_brain = Brain("fsaverage", subjects_dir=recon_dir, surf="pial",
                     hemi="rh", background="white", show=False,
                     cortex=(0.9, 0.9, 0.9), alpha=0.05, size=(800, 800))
    
    # 添加insula labels
    for lab in labels:
        if any(pattern in lab.name for pattern in insula_patterns):
            if lab.hemi == 'lh':
                lh_brain.add_label(lab, borders=False, color=(0.9, 0.9, 0.9), alpha=0.6)
            elif lab.hemi == 'rh':
                rh_brain.add_label(lab, borders=False, color=(0.9, 0.9, 0.9), alpha=0.6)
    
    # 添加电极
    def add_electrodes(brain, coords, hga_vals, tree, pial_coords):
        if len(coords) == 0:
            return
        _, indices = tree.query(coords)
        coords_proj = pial_coords[indices]
        for i in range(len(coords_proj)):
            pt = coords_proj[i:i+1]
            hga = hga_vals[i]
            color = cmap(norm(hga))[:3]
            size = hga_to_size(hga, vmin, vmax, size_min, size_max)
            cloud = pv.PolyData(pt)
            brain._renderer.plotter.add_mesh(cloud, render_points_as_spheres=True,
                                             point_size=size, color=color, lighting=False)
    
    if mask_lh.any():
        add_electrodes(lh_brain, cord[mask_lh], hga_values[mask_lh], lh_tree, lh_pial_coords)
    if mask_rh.any():
        add_electrodes(rh_brain, cord[mask_rh], hga_values[mask_rh], rh_tree, rh_pial_coords)
    
    # 设置视角
    lh_brain.show_view(azimuth=180, elevation=90, distance=180, focalpoint=lh_insula_center)
    rh_brain.show_view(azimuth=0, elevation=90, distance=180, focalpoint=rh_insula_center)
    
    # 创建图像
    fig, axes = plt.subplots(1, 2, figsize=(16*cm, 10*cm))
    
    axes[0].imshow(lh_brain.screenshot(mode="rgb"))
    axes[0].axis("off")
    axes[0].set_title("Left Insula", fontsize=fontsize)
    
    axes[1].imshow(rh_brain.screenshot(mode="rgb"))
    axes[1].axis("off")
    axes[1].set_title("Right Insula", fontsize=fontsize)
    
    lh_brain.close()
    rh_brain.close()
    
    # 添加colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    cbar = fig.colorbar(sm, ax=axes, orientation='horizontal', fraction=0.04, pad=0.02, aspect=30)
    cbar.set_label('HGA (z-score)', fontsize=fontsize)
    cbar.ax.tick_params(labelsize=fontsize)
    
    # 添加时间轴
    ax_time = fig.add_axes([0.15, 0.02, 0.7, 0.03])
    ax_time.set_xlim(time_min, time_max)
    ax_time.set_ylim(0, 1)
    ax_time.axvline(x=0, color='gray', linestyle='--', linewidth=0.5)
    
    # 当前时间窗口
    ax_time.axvspan(t_start, t_end, color=red, alpha=0.5)
    ax_time.plot((t_start + t_end) / 2, 0.5, 'ko', markersize=5)
    
    ax_time.set_xlabel('Time (s)', fontsize=fontsize)
    ax_time.set_yticks([])
    ax_time.tick_params(labelsize=fontsize)
    for spine in ['top', 'left', 'right']:
        ax_time.spines[spine].set_visible(False)
    
    plt.suptitle(f"Production HGA: {t_start:.2f}s - {t_end:.2f}s", fontsize=fontsize+1)
    
    # 转换为图像数组 (使用PIL方式)
    buf = BytesIO()
    fig.savefig(buf, format='png', dpi=100, bbox_inches='tight')
    buf.seek(0)
    img = np.array(Image.open(buf).convert('RGB'))
    buf.close()
    plt.close(fig)
    
    return img

# 生成所有帧
frames = []
for t in tqdm(time_points, desc="Generating frames"):
    img = generate_frame(t, insula_data, lh_tree, rh_tree, lh_pial_coords, rh_pial_coords,
                         labels, insula_patterns, lh_insula_center, rh_insula_center,
                         cmap, vmin, vmax, size_min, size_max, window_size, time_min, time_max)
    if img is not None:
        frames.append(img)

# 保存视频 (使用gif格式避免codec问题)
output_path = '../img/insula_hga_production.gif'
imageio.mimsave(output_path, frames, duration=100)  # 100ms per frame = 10fps
print(f"Video saved to {output_path}")

Reading labels from parcellation...


   read 75 labels from /cwork/ns458/ECoG_Recon/fsaverage/label/lh.aparc.a2009s.annot
   read 75 labels from /cwork/ns458/ECoG_Recon/fsaverage/label/rh.aparc.a2009s.annot
Insula electrodes: 207
Fixed colormap range: [0.000, 0.807]


Generating frames:   0%|          | 0/39 [00:00<?, ?it/s]VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-15 18:24:24.565 ( 207.955s) [    7F5842EE7440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x6b83d080): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-15 18:24:24.676 ( 208.067s) [    7F5842EE7440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x6b83d080): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-15 18:24:24.792 ( 208.183s) [    7F5842EE7440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x53923fa0): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-15 18:24:25.035 

Video saved to ../img/insula_hga_production.gif


# Perception

In [None]:
# 3D Insula HGA Video Generation - Perception
# 重新加载perception数据
task = ['PhonemeSequence', 'SentenceRep', 'TIMIT', 'LexicalDelay']
ref = 'bipolar'

time_paths = []
for t in task:
    time_paths.extend(
        BIDSPath(
            root=f'../results/{t}({ref})',
            datatype='HGA',
            suffix='time',
            check=False,
        ).match()
    )

time_dfs = []
for path in tqdm(time_paths):
    df = pd.read_csv(path)
    time_dfs.append(df)

listen_data = pd.concat(time_dfs)

# rename ROIs
listen_data.loc[listen_data.roi == 'INS', 'roi'] = 'Insula'
listen_data.loc[listen_data.roi == 'HG', 'roi'] = 'STG'
listen_data.loc[listen_data.roi == 'PrG', 'roi'] = 'SMC'
listen_data.loc[listen_data.roi == 'PoG', 'roi'] = 'SMC'
listen_data.loc[listen_data.roi == 'Subcentral', 'roi'] = 'SMC'

# 筛选perception/passive数据
listen_data = listen_data[~listen_data.description.isin(['production'])]
listen_data = listen_data[~listen_data.phase.isin(['Go', 'Response'])]

# 筛选时间范围和insula
listen_data = listen_data[(listen_data['time'] >= -1) & (listen_data['time'] <= 1)]
insula_listen = listen_data[listen_data['roi'] == 'Insula'].copy()
print(f"Insula electrodes (perception): {insula_listen['channel'].nunique()}")


 98%|█████████▊| 954/975 [00:52<00:01, 13.49it/s]

In [None]:
# 参数设置
window_size = 0.1  # 100ms window
step_size = 0.05   # 50ms step
time_min_p, time_max_p = insula_listen['time'].min(), insula_listen['time'].max()
time_points_p = np.arange(time_min_p, time_max_p - window_size + step_size, step_size)

# 固定的vmin, vmax (across all windows)
all_values_p = insula_listen['value'].values
vmin_p, vmax_p = 0, np.percentile(all_values_p[all_values_p > 0], 95)
print(f"Fixed colormap range: [{vmin_p:.3f}, {vmax_p:.3f}]")

# 使用blues colormap
cmap_p = blues

def generate_frame_perception(t_start, insula_data, lh_tree, rh_tree, lh_pial_coords, rh_pial_coords,
                               labels, insula_patterns, lh_insula_center, rh_insula_center,
                               cmap, vmin, vmax, size_min, size_max, window_size, time_min, time_max):
    """生成单帧图像 - Perception"""
    t_end = t_start + window_size
    
    # 计算当前窗口的HGA
    window_data = insula_data[(insula_data['time'] >= t_start) & (insula_data['time'] < t_end)]
    hga_df = window_data.groupby('channel').agg({
        'value': 'mean',
        'x': 'first', 'y': 'first', 'z': 'first'
    }).reset_index()
    
    if len(hga_df) == 0:
        return None
    
    cord = hga_df[['x', 'y', 'z']].values
    hga_values = hga_df['value'].values
    mask_lh = cord[:, 0] < 0
    mask_rh = cord[:, 0] > 0
    
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
    
    # 创建Brain对象
    lh_brain = Brain("fsaverage", subjects_dir=recon_dir, surf="pial",
                     hemi="lh", background="white", show=False,
                     cortex=(0.9, 0.9, 0.9), alpha=0.05, size=(800, 800))
    rh_brain = Brain("fsaverage", subjects_dir=recon_dir, surf="pial",
                     hemi="rh", background="white", show=False,
                     cortex=(0.9, 0.9, 0.9), alpha=0.05, size=(800, 800))
    
    # 添加insula labels
    for lab in labels:
        if any(pattern in lab.name for pattern in insula_patterns):
            if lab.hemi == 'lh':
                lh_brain.add_label(lab, borders=False, color=(0.9, 0.9, 0.9), alpha=0.6)
            elif lab.hemi == 'rh':
                rh_brain.add_label(lab, borders=False, color=(0.9, 0.9, 0.9), alpha=0.6)
    
    # 添加电极
    def add_electrodes(brain, coords, hga_vals, tree, pial_coords):
        if len(coords) == 0:
            return
        _, indices = tree.query(coords)
        coords_proj = pial_coords[indices]
        for i in range(len(coords_proj)):
            pt = coords_proj[i:i+1]
            hga = hga_vals[i]
            color = cmap(norm(hga))[:3]
            size = hga_to_size(hga, vmin, vmax, size_min, size_max)
            cloud = pv.PolyData(pt)
            brain._renderer.plotter.add_mesh(cloud, render_points_as_spheres=True,
                                             point_size=size, color=color, lighting=False)
    
    if mask_lh.any():
        add_electrodes(lh_brain, cord[mask_lh], hga_values[mask_lh], lh_tree, lh_pial_coords)
    if mask_rh.any():
        add_electrodes(rh_brain, cord[mask_rh], hga_values[mask_rh], rh_tree, rh_pial_coords)
    
    # 设置视角
    lh_brain.show_view(azimuth=180, elevation=90, distance=180, focalpoint=lh_insula_center)
    rh_brain.show_view(azimuth=0, elevation=90, distance=180, focalpoint=rh_insula_center)
    
    # 创建图像
    fig, axes = plt.subplots(1, 2, figsize=(16*cm, 10*cm))
    
    axes[0].imshow(lh_brain.screenshot(mode="rgb"))
    axes[0].axis("off")
    axes[0].set_title("Left Insula", fontsize=fontsize)
    
    axes[1].imshow(rh_brain.screenshot(mode="rgb"))
    axes[1].axis("off")
    axes[1].set_title("Right Insula", fontsize=fontsize)
    
    lh_brain.close()
    rh_brain.close()
    
    # 添加colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    cbar = fig.colorbar(sm, ax=axes, orientation='horizontal', fraction=0.04, pad=0.02, aspect=30)
    cbar.set_label('HGA (z-score)', fontsize=fontsize)
    cbar.ax.tick_params(labelsize=fontsize)
    
    # 添加时间轴
    ax_time = fig.add_axes([0.15, 0.02, 0.7, 0.03])
    ax_time.set_xlim(time_min, time_max)
    ax_time.set_ylim(0, 1)
    ax_time.axvline(x=0, color='gray', linestyle='--', linewidth=0.5)
    
    # 当前时间窗口
    ax_time.axvspan(t_start, t_end, color=blue, alpha=0.5)
    ax_time.plot((t_start + t_end) / 2, 0.5, 'ko', markersize=5)
    
    ax_time.set_xlabel('Time (s)', fontsize=fontsize)
    ax_time.set_yticks([])
    ax_time.tick_params(labelsize=fontsize)
    for spine in ['top', 'left', 'right']:
        ax_time.spines[spine].set_visible(False)
    
    plt.suptitle(f"Perception HGA: {t_start:.2f}s - {t_end:.2f}s", fontsize=fontsize+1)
    
    # 转换为图像数组
    buf = BytesIO()
    fig.savefig(buf, format='png', dpi=100, bbox_inches='tight')
    buf.seek(0)
    img = np.array(Image.open(buf).convert('RGB'))
    buf.close()
    plt.close(fig)
    
    return img

# 生成所有帧
frames_p = []
for t in tqdm(time_points_p, desc="Generating perception frames"):
    img = generate_frame_perception(t, insula_listen, lh_tree, rh_tree, lh_pial_coords, rh_pial_coords,
                                     labels, insula_patterns, lh_insula_center, rh_insula_center,
                                     cmap_p, vmin_p, vmax_p, size_min, size_max, window_size, time_min_p, time_max_p)
    if img is not None:
        frames_p.append(img)

# 保存视频
output_path_p = '../img/insula_hga_perception.gif'
imageio.mimsave(output_path_p, frames_p, duration=100)
print(f"Video saved to {output_path_p}")

Fixed colormap range: [0.000, 1.161]


Generating perception frames:   0%|          | 0/39 [00:00<?, ?it/s]VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-14 18:03:07.707 (7222.243s) [    7F38A9534440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x3ea0ebb0): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-14 18:03:07.779 (7222.314s) [    7F38A9534440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x3ea0ebb0): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-14 18:03:07.841 (7222.377s) [    7F38A9534440] vtkEGLRenderWindow.cxx:353   WARN| vtkEGLRenderWindow (0x5d4317e0): EGL device index: 0 could not be initialized. Trying other devices...[0m
VMware: No 3D enabled (0, Success).
VMware: No 3D enabled (0, Success).

[0m[33m2025-12-14 18

Video saved to ../img/insula_hga_perception.gif
