<a href="https://colab.research.google.com/github/lcasxw/Embodied-Intelligent-Imitation-Learning-Framework/blob/main/%E6%96%B0%E7%94%9F.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim

# 定义模型结构
class SimpleEEGNet(nn.Module):
    def __init__(self):
        super(SimpleEEGNet, self).__init__()
        # 3层全连接层
        self.model = nn.Sequential(
            nn.Linear(310, 128),  # 输入层 (310) -> 隐藏层1 (128)
            nn.ReLU(),           # 激活函数，增加非线性能力
            nn.Linear(128, 64),   # 隐藏层1 -> 隐藏层2 (64)
            nn.ReLU(),
            nn.Linear(64, 3)      # 输出层 (3类：积极、中性、消极)
        )

    def forward(self, x):
        # x 的形状是 (batch_size, 310)
        return self.model(x)

# 实例化模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleEEGNet().to(device)
print(model)

SimpleEEGNet(
  (model): Sequential(
    (0): Linear(in_features=310, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=3, bias=True)
  )
)


In [4]:
import scipy.io as sio

# 读取你截图中的第一个文件
test_file = '/content/drive/MyDrive/SEED/Preprocessed_EEG/1_20131027.mat'
data = sio.loadmat(test_file)

# 打印所有变量名
print("当前文件包含的变量名：")
print([k for k in data.keys() if not k.startswith('__')])

当前文件包含的变量名：
['djc_eeg1', 'djc_eeg2', 'djc_eeg3', 'djc_eeg4', 'djc_eeg5', 'djc_eeg6', 'djc_eeg7', 'djc_eeg8', 'djc_eeg9', 'djc_eeg10', 'djc_eeg11', 'djc_eeg12', 'djc_eeg13', 'djc_eeg14', 'djc_eeg15']


In [None]:
import numpy as np
import scipy.io as sio
import torch
from torch.utils.data import DataLoader, TensorDataset

# 1. 基础设置
# SEED 官方 15 个 Trial 的标签序列
seed_labels = [1, 0, -1, -1, 0, 1, -1, 0, 1, 1, 0, -1, 0, 1, -1]
label_map = {-1: 0, 0: 1, 1: 2} # 映射为 PyTorch 识别的 0,1,2

def compute_de(signal):
    """计算简易微分熵: DE = log(std(信号))"""
    return np.log(np.std(signal))

def load_and_preprocess(file_path):
    data = sio.loadmat(file_path)
    all_x = []
    all_y = []

    # 采样率 200Hz，我们按 1 秒 (200个点) 作为一个样本
    window_size = 200

    print(f"正在处理文件: {file_path.split('/')[-1]}")

    for i in range(1, 16):
        key = f'djc_eeg{i}'
        if key in data:
            # 原始信号形状通常是 (62, 采样点数)
            trial_signal = data[key]
            n_channels = trial_signal.shape[0]
            n_samples = trial_signal.shape[1]

            # 按照 1 秒一个窗口切分
            for start in range(0, n_samples - window_size, window_size):
                end = start + window_size
                window = trial_signal[:, start:end] # 形状 (62, 200)

                # 对 62 个通道分别计算 DE 特征
                de_feat = [compute_de(window[ch, :]) for ch in range(n_channels)]

                all_x.append(de_feat)
                all_y.append(label_map[seed_labels[i-1]])

    return np.array(all_x), np.array(all_y)

# 2. 执行加载（先拿你截图中的第一个文件试跑）
file_path = '/content/drive/MyDrive/SEED/Preprocessed_EEG/1_20131027.mat'
X, Y = load_and_preprocess(file_path)

# 3. 转换为 Tensor
X_tensor = torch.FloatTensor(X) # 形状应该是 (样本数, 62)
Y_tensor = torch.LongTensor(Y)

# 4. 封装
train_ds = TensorDataset(X_tensor, Y_tensor)
train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)

print(f"\n处理完成！")
print(f"特征形状: {X_tensor.shape} (每个样本 62 个通道的 DE 特征)")
print(f"标签形状: {Y_tensor.shape}")

正在处理文件: 1_20131027.mat

处理完成！
特征形状: torch.Size([3394, 62]) (每个样本 62 个通道的 DE 特征)
标签形状: torch.Size([3394])


In [6]:
# 修改输入层为 62
class SimpleEEGNet(nn.Module):
    def __init__(self):
        super(SimpleEEGNet, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(62, 128),  # 注意这里改成了 62
            nn.ReLU(),
            nn.Dropout(0.3),     # 加入 Dropout 防止过拟合，这在论文里是加分项
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 3)     # 3类输出
        )

    def forward(self, x):
        return self.model(x)

model = SimpleEEGNet().to(device)

In [9]:
# 假设你的特征变量叫 X_tensor
print(f"特征矩阵的形状是: {X_tensor.shape}")

NameError: name 'X_tensor' is not defined

In [10]:
import numpy as np
import scipy.io as sio
import torch
from scipy.signal import butter, lfilter
from torch.utils.data import DataLoader, TensorDataset

# ---------------------------------------------------------
# 1. 定义滤波器：这是提取 310 维特征的关键
# ---------------------------------------------------------
def butter_bandpass_filter(data, lowcut, highcut, fs, order=3):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data)

# SEED 数据集定义的 5 个核心频段
BANDS = {
    'delta': [1, 4],
    'theta': [4, 8],
    'alpha': [8, 14],
    'beta': [14, 31],
    'gamma': [31, 50]
}

def compute_de(signal):
    """计算微分熵"""
    return np.log(np.std(signal) + 1e-8) # 加一个极小值防止 log(0)

# ---------------------------------------------------------
# 2. 升级版加载函数：输出形状为 (样本数, 310)
# ---------------------------------------------------------
def load_and_preprocess_310(file_path):
    data = sio.loadmat(file_path)
    seed_labels = [1, 0, -1, -1, 0, 1, -1, 0, 1, 1, 0, -1, 0, 1, -1]
    label_map = {-1: 0, 0: 1, 1: 2}

    all_x = []
    all_y = []
    window_size = 200 # 1秒一个窗口 (200Hz)

    print(f"正在进行多频段特征提取: {file_path.split('/')[-1]}...")

    for i in range(1, 16):
        key = f'djc_eeg{i}'
        if key in data:
            trial_signal = data[key] # (62, 采样点)
            n_samples = trial_signal.shape[1]

            for start in range(0, n_samples - window_size, window_size):
                end = start + window_size
                window = trial_signal[:, start:end]

                # --- 核心提取逻辑 ---
                sample_features = []
                for ch in range(62):
                    ch_signal = window[ch, :]
                    for band_name, (low, high) in BANDS.items():
                        # 每一秒、每个通道，都滤出5个频段分别算DE
                        filtered = butter_bandpass_filter(ch_signal, low, high, 200)
                        sample_features.append(compute_de(filtered))

                all_x.append(sample_features) # 长度 62 * 5 = 310
                all_y.append(label_map[seed_labels[i-1]])

    return np.array(all_x), np.array(all_y)

# 3. 运行（以第一个文件为例）
file_path = '/content/drive/MyDrive/SEED/Preprocessed_EEG/1_20131027.mat'
X_310, Y_310 = load_and_preprocess_310(file_path)

print(f"\n✅ 升级成功！")
print(f"特征形状: {X_310.shape}  <-- 看到 310 了吗？")

正在进行多频段特征提取: 1_20131027.mat...

✅ 升级成功！
特征形状: (3394, 310)  <-- 看到 310 了吗？


In [15]:
import torch.nn as nn

class ImprovedEEGNet(nn.Module):
    def __init__(self):
        super(ImprovedEEGNet, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(310, 256),  # 输入维度改为 310
            nn.BatchNorm1d(256),  # 增加批标准化，让训练更稳
            nn.ReLU(),
            nn.Dropout(0.4),      # 增加 Dropout 防止过拟合
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 4)     # 3分类输出
        )

    def forward(self, x):
        return self.model(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedEEGNet().to(device)

In [None]:
epochs = 50 # 训练轮数

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss() # 适用于多分类任务
optimizer = optim.Adam(model.parameters(), lr=0.001) # Adam优化器，学习率设为0.001

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # 1. 梯度清零
        optimizer.zero_grad()

        # 2. 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # 3. 反向传播与优化
        loss.backward()
        optimizer.step()

        # 统计
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}, Acc: {100.*correct/total:.2f}%')

Epoch [10/50], Loss: 0.1104, Acc: 96.17%
Epoch [20/50], Loss: 0.0749, Acc: 97.20%
Epoch [30/50], Loss: 0.0583, Acc: 97.86%
Epoch [40/50], Loss: 0.0315, Acc: 98.97%
Epoch [50/50], Loss: 0.0398, Acc: 98.67%


In [12]:
from sklearn.model_selection import train_test_split

# 将 310 维数据转为张量
X_tensor = torch.FloatTensor(X_310)
Y_tensor = torch.LongTensor(Y_310)

# 划分 80% 训练，20% 测试
X_train, X_test, y_train, y_test = train_test_split(
    X_tensor, Y_tensor, test_size=0.2, random_state=42
)

# 包装成 DataLoader
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=64, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=64, shuffle=False)

In [13]:
def load_multimodal_data(eeg_file_path, eye_dir):
    # 1. 加载脑电 (我们之前生成的 310 维特征)
    # 注意：这里假设你已经运行了之前的提取 310 维的代码，或者直接读取提取好的 eeg_feature_smooth
    eeg_data = sio.loadmat(eeg_file_path)

    # 2. 找到对应的眼动文件
    # 比如脑电文件名是 1_20131027.mat，眼动通常也叫这个名
    file_name = eeg_file_path.split('/')[-1]
    eye_file_path = os.path.join(eye_dir, file_name)
    eye_data = sio.loadmat(eye_file_path)

    all_combined_x = []
    all_y = []

    # SEED 标签映射
    seed_labels = [1, 0, -1, -1, 0, 1, -1, 0, 1, 1, 0, -1, 0, 1, -1]
    label_map = {-1: 0, 0: 1, 1: 2}

    for i in range(1, 16):
        eeg_key = f'de_moving_ave{i}' # 或者是你之前生成的特征 key
        eye_key = f'de_moving_ave{i}' # 眼动特征在 mat 里的 key 通常也叫这个，或者叫 'eye_1'

        if eeg_key in eeg_data and eye_key in eye_data:
            eeg_feat = eeg_data[eeg_key] # 形状 (Windows, 310)
            eye_feat = eye_data[eye_key] # 形状 (Windows, 31)

            # --- 核心融合步骤：特征拼接 ---
            # 确保两个模态的时间窗口（Windows）数量一致
            min_windows = min(eeg_feat.shape[0], eye_feat.shape[0])
            combined = np.hstack((eeg_feat[:min_windows], eye_feat[:min_windows]))

            all_combined_x.append(combined)
            all_y.append(np.full(min_windows, label_map[seed_labels[i-1]]))

    return np.concatenate(all_combined_x), np.concatenate(all_y)

# 假设输入维度变成了 310 + 31 = 341

In [14]:
import os
import numpy as np
import scipy.io as sio

def fuse_eeg_and_eye(eeg_features_310, eeg_labels, eye_dir, file_name):
    """
    将提取好的 310 维 EEG 与眼动特征对齐
    eeg_features_310: 你之前跑出的 (N, 310) 矩阵
    eye_dir: '/content/drive/MyDrive/SEED/SEED_IV/eye_feature_smooth/'
    file_name: 当前处理的文件名, 如 '1_20131027.mat'
    """
    eye_data = sio.loadmat(os.path.join(eye_dir, file_name))

    # SEED 常见的眼动特征 key 是 'de_moving_ave1' 到 'de_moving_ave15'
    # 或者直接在数据字典里找符合形状的 key
    all_eye_feat = []
    for i in range(1, 16):
        key = f'de_moving_ave{i}'
        if key in eye_data:
            # 眼动特征形状通常是 (Windows, 31)
            feat = eye_data[key]
            # 展平处理，确保维度一致
            feat = feat.reshape(feat.shape[0], -1)
            all_eye_feat.append(feat)

    eye_features_all = np.concatenate(all_eye_feat, axis=0)

    # --- 关键对齐步骤 ---
    # 脑电是你自己切的，眼动是官方切的，长度可能微差 1-2 秒
    min_samples = min(eeg_features_310.shape[0], eye_features_all.shape[0])

    eeg_part = eeg_features_310[:min_samples]
    eye_part = eye_features_all[:min_samples]
    labels_part = eeg_labels[:min_samples]

    # 特征级融合：直接拼接 (Concatenation)
    fused_features = np.hstack((eeg_part, eye_part)) # 维度: 310 + 31 = 341

    return fused_features, labels_part

print("多模态对齐逻辑已就绪。")

多模态对齐逻辑已就绪。


In [16]:
import torch.nn.functional as F

class GatedMultimodalNet(nn.Module):
    def __init__(self):
        super(GatedMultimodalNet, self).__init__()
        # 脑电分支
        self.eeg_path = nn.Sequential(nn.Linear(310, 128), nn.ReLU())
        # 眼动分支
        self.eye_path = nn.Sequential(nn.Linear(31, 64), nn.ReLU())

        # 门控权重计算：让模型自己决定看脑电多一点还是看眼动多一点
        self.gate = nn.Sequential(
            nn.Linear(128 + 64, 2),
            nn.Softmax(dim=1)
        )

        self.classifier = nn.Sequential(
            nn.Linear(128 + 64, 64),
            nn.ReLU(),
            nn.Linear(64, 4) # SEED-IV 设为 4
        )

    def forward(self, eeg, eye):
        e_h = self.eeg_path(eeg)
        y_h = self.eye_path(eye)

        # 计算权重
        combined = torch.cat([e_h, y_h], dim=1)
        weights = self.gate(combined)

        # 加权融合
        fused = torch.cat([e_h * weights[:, 0:1], y_h * weights[:, 1:2]], dim=1)
        return self.classifier(fused)

In [33]:
import os
import scipy.io as sio
import numpy as np
import torch

# 1. 路径配置 (根据你的云端硬盘路径修改)
eeg_dir = '/content/drive/MyDrive/SEED/Preprocessed_EEG/'
eye_dir = '/content/drive/MyDrive/SEED/SEED_IV/eye_feature_smooth/1/'

# SEED-IV Session 1 的官方标签序列 (来自你的 Excel 截图)
session1_labels = [1, 2, 3, 0, 2, 0, 0, 1, 0, 1, 2, 1, 1, 1, 2, 3, 2, 2, 3, 3, 3, 0, 3, 0]

def get_fused_data(eeg_file_name):
    eye_path = os.path.join(eye_dir, eeg_file_name)
    if not os.path.exists(eye_path):
        raise FileNotFoundError(f"眼动文件未找到: {eye_path}")

    eye_mat = sio.loadmat(eye_path)
    print(f"文件 '{eeg_file_name}' 中包含的键: { [k for k in eye_mat.keys() if not k.startswith('__')] }")

    all_eye_feat = []
    found_keys = False
    expected_eye_features_dim = 31 # 模型期望的眼动特征维度

    for i in range(1, 25):
        key = f'eye_{i}'
        if key in eye_mat:
            raw_feat = eye_mat[key]
            print(f"  Trial {i} ({key}) 原始形状: {raw_feat.shape}") # 打印原始形状

            # 目标是得到 (num_windows, expected_eye_features_dim) 的形状
            reshaped_feat = None
            if raw_feat.ndim == 2 and raw_feat.shape[1] == expected_eye_features_dim:
                # Case 1: 已经是 (num_windows, 31) 形状
                reshaped_feat = raw_feat
            elif raw_feat.ndim == 2 and raw_feat.shape[0] == expected_eye_features_dim and raw_feat.shape[1] > 1:
                # Case 2: (31, num_windows) 形状，需要转置
                reshaped_feat = raw_feat.T
                print(f"  已尝试将 Trial {i} ({key}) 从原始形状 {raw_feat.shape} 转置为 ({reshaped_feat.shape[0]}, {reshaped_feat.shape[1]})。")
            elif raw_feat.ndim == 3 and raw_feat.shape[-1] == expected_eye_features_dim:
                # Case 3: (num_windows, X, 31) 形状，或者 (num_windows, 1, 31)
                reshaped_feat = raw_feat.reshape(-1, expected_eye_features_dim)
            elif raw_feat.ndim == 1 and raw_feat.size == expected_eye_features_dim:
                # Case 4: (31,) 形状，表示单个样本
                reshaped_feat = raw_feat.reshape(1, expected_eye_features_dim)
            else:
                # 对于其他复杂的形状，需要更具体的处理，这里先抛出错误
                raise ValueError(f"Trial {i} ({key}) 的原始数据形状 {raw_feat.shape} 无法解析为 {expected_eye_features_dim} 维特征。请检查数据结构。")

            if reshaped_feat.shape[1] != expected_eye_features_dim:
                raise ValueError(f"Trial {i} ({key}) 最终特征维度为 {reshaped_feat.shape[1]}，与预期 {expected_eye_features_dim} 不符。请检查数据是否正确。")

            all_eye_feat.append(reshaped_feat)
            print(f"  Trial {i} ({key}) 最终特征形状: {reshaped_feat.shape}")
            found_keys = True

    if not found_keys:
        raise ValueError(f"未在文件 '{eeg_file_name}' 中找到任何 'eye_{i}' 形式的眼动特征。请检查文件内容或 eye_dir 路径。")

    if not all_eye_feat:
        raise ValueError("没有收集到任何眼动特征，无法进行拼接。")

    # 在拼接前，由于我们上面已经强制检查了每个特征都是 expected_eye_features_dim，所以维度必然一致
    eye_features_all = np.concatenate(all_eye_feat, axis=0)

    return eye_features_all

# 测试读取眼电
try:
    test_eye = get_fused_data('1_20160518.mat')
    print(f"眼电特征提取完成，形状: {test_eye.shape}")
except FileNotFoundError as e:
    print(f"错误: {e}")
    print(f"请确认路径 '{eye_dir}' 是否正确，并且文件 '1_20160518.mat' 存在。")
    if os.path.exists(eye_dir):
        print(f"当前目录 '{eye_dir}' 下的文件有: {os.listdir(eye_dir)}")
    else:
        print(f"错误：目录 '{eye_dir}' 不存在。")
except ValueError as e:
    print(f"错误: {e}")
except Exception as e:
    print(f"发生未知错误: {e}")

文件 '1_20160518.mat' 中包含的键: ['eye_1', 'eye_2', 'eye_3', 'eye_4', 'eye_5', 'eye_6', 'eye_7', 'eye_8', 'eye_9', 'eye_10', 'eye_11', 'eye_12', 'eye_13', 'eye_14', 'eye_15', 'eye_16', 'eye_17', 'eye_18', 'eye_19', 'eye_20', 'eye_21', 'eye_22', 'eye_23', 'eye_24']
  Trial 1 (eye_1) 原始形状: (31, 42)
  已尝试将 Trial 1 (eye_1) 从原始形状 (31, 42) 转置为 (42, 31)。
  Trial 1 (eye_1) 最终特征形状: (42, 31)
  Trial 2 (eye_2) 原始形状: (31, 23)
  已尝试将 Trial 2 (eye_2) 从原始形状 (31, 23) 转置为 (23, 31)。
  Trial 2 (eye_2) 最终特征形状: (23, 31)
  Trial 3 (eye_3) 原始形状: (31, 49)
  已尝试将 Trial 3 (eye_3) 从原始形状 (31, 49) 转置为 (49, 31)。
  Trial 3 (eye_3) 最终特征形状: (49, 31)
  Trial 4 (eye_4) 原始形状: (31, 32)
  已尝试将 Trial 4 (eye_4) 从原始形状 (31, 32) 转置为 (32, 31)。
  Trial 4 (eye_4) 最终特征形状: (32, 31)
  Trial 5 (eye_5) 原始形状: (31, 22)
  已尝试将 Trial 5 (eye_5) 从原始形状 (31, 22) 转置为 (22, 31)。
  Trial 5 (eye_5) 最终特征形状: (22, 31)
  Trial 6 (eye_6) 原始形状: (31, 40)
  已尝试将 Trial 6 (eye_6) 从原始形状 (31, 40) 转置为 (40, 31)。
  Trial 6 (eye_6) 最终特征形状: (40, 31)
  Trial 7 (eye_7) 原始形