In [1]:
import os
import pandas as pd
import numpy as np
import mne
from mne import io
from mne.datasets import sample
from mne.preprocessing import ICA
from matplotlib import pyplot as plt
from imblearn.over_sampling import RandomOverSampler
import torch
from PIL import Image
from torch.utils.data import DataLoader
import torch.nn as nn

In [2]:
# 设置通道数和样本数
channels = 14
samples = 384
# 存放数据的文件夹路径
data_folder = './EEGData/'
# 存储所有数据的列表和标签列表
data_list = []
label_list = []

# 遍历文件夹中的所有CSV文件
for file_name in os.listdir(data_folder):
    if file_name.endswith(".csv"):
        file_path = os.path.join(data_folder, file_name)
        
        # 使用pandas读取CSV文件，只读取2-15列的数据
        df = pd.read_csv(file_path, usecols=list(range(1, 15)), header=0)
        sfreq = 128  # 采样率为128Hz
        ch_names = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'AF4']
        ch_types = ['eeg'] * channels

        # 创建MNE的Raw对象
        info = mne.create_info(ch_names=ch_names, ch_types=ch_types, sfreq=sfreq)
        raw = mne.io.RawArray(df.T, info=info)
        # 滤波
        raw.filter(l_freq=1.0, h_freq=50.0)
        # 创建ICA对象并拟合数据
        ica = ICA(n_components=channels, random_state=0, max_iter=1000)  # 调整参数
        ica.fit(raw)
        # 应用ICA滤波
        ica.exclude = []  # 将排除的独立成分列表设置为空
        ica.apply(raw)
        # 获取滤波后的数据
        data = raw.get_data().T

        # 将数据调整为每个trial的形状
        # 假设数据的样本数为samples
        num_trials = data.shape[0] // samples
        data = data[:num_trials * samples, :]
        # 重新将数据分成每个trial的形状
        data = data.reshape(num_trials, samples, channels)
        # 添加数据到列表中
        data_list.append(data)
        # 添加标签到列表中
        labels = np.zeros(num_trials, dtype=int)
        labels[3:18] = 1  # Trials 4-18 labeled as 1
        labels[18:] = 2   # Trials 19-21 labeled as 2
        label_list.append(labels)

# 将数据列表转换为numpy数组并按顺序连接
data_array = np.concatenate(data_list, axis=0)
# 将标签列表转换为numpy数组并按顺序连接
label_array = np.concatenate(label_list, axis=0)

print("Data shape:", data_array.shape) # (378, 384, 14) 14个通道，每个通道384个样本，总共399个试验数量（18人 x 21个/人）
print("Label shape:", label_array.shape) # (378,) 399个试验数量

# 先不用扩充数据
# ros = RandomOverSampler(sampling_strategy='auto', random_state=42)
# data_array = data_array.reshape(-1, 384*14)
# data, labels = ros.fit_resample(data_array, label_array)
data, labels = data_array, label_array
data = data.reshape(-1,384,14)
data = data.transpose(0,2,1)
print(data.shape)
print(labels.shape)
# print(labels)

eeg_data = data

Creating RawArray with float64 data, n_channels=14, n_times=8064
    Range : 0 ... 8063 =      0.000 ...    62.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 50 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 50.00 Hz
- Upper transition bandwidth: 12.50 Hz (-6 dB cutoff frequency: 56.25 Hz)
- Filter length: 423 samples (3.305 s)

Fitting ICA to data using 14 channels (please be patient, this may take a while)


In [1]:


# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(len(labels[labels == 0]))
print(len(labels[labels == 1]))
print(len(labels[labels == 2]))

eeg_data = data

# 创建一个空的numpy数组，用于存放图像数据
image_data = np.empty((378, 3, 128, 128))
# 读取图片并存放到numpy数组中
for i in range(1, 379):
    filename = f"./facesFromFrames_new/{i}.jpg"
    image = Image.open(filename)
    image = image.resize((128, 128))
    # 将图像数据转换为数组
    image_array = np.asarray(image)
    # 将数组添加到image_data中
    image_data[i-1] = image_array.transpose((2, 0, 1))
print(image_data.shape)

# 使用zip函数将它们组合在一起
combined_data = np.array(list(zip(eeg_data, image_data)))

# 现在，combined_data是一个长度为399的数组，每个元素都是一个元组，元组中包含一个14x384的数组和一个3x128x128的数组
print(combined_data.shape)  # 输出：(399, 2)
print(combined_data[0][0].shape)  # 输出：(14, 384)
print(combined_data[0][1].shape)  # 输出：(3, 128, 128)

data = combined_data

class MultiModalDataset(torch.utils.data.Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        eeg_data = self.data[index][0]
        image_data = self.data[index][1]
        label = self.labels[index]
        return eeg_data, image_data, label



# 随机划分训练集和测试集
from sklearn.model_selection import train_test_split
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.3, random_state=42)

    
# 分类模型
class MultiModelClassifier(nn.Module):
    def __init__(self, input_size=384, num_classes=3, 
                 num_heads=14, dim_feedforward=2048, num_encoder_layers=6,
                 in_c=3, embed_dim=384, patch_size=16,
                 ):
        super(MultiModelClassifier, self).__init__()
        self.proj = nn.Conv2d(in_c, embed_dim, kernel_size=patch_size, stride=patch_size) # [n, 384, 8, 8]
        self.norm = nn.Identity()
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_size, nhead=num_heads, dim_feedforward=dim_feedforward, batch_first=True),
            num_layers=num_encoder_layers
        )
        self.cls_token = nn.Parameter(torch.zeros(1, 1, input_size))

        self.fc = nn.Linear(input_size, num_classes)

        
    def forward(self, eeg_data, image_data):

        image_embedding = self.proj(image_data).flatten(2).transpose(1, 2)
        image_embedding = self.norm(image_embedding)

        multi_embedding = torch.cat((eeg_data, image_embedding), dim=1)

        cls_tokens = self.cls_token.expand(multi_embedding.shape[0], -1, -1)
        multi_embedding = torch.cat((cls_tokens, multi_embedding), dim=1)

        multi_embedding = self.transformer_encoder(multi_embedding)

        # 取出cls token的输出
        multi_embedding = multi_embedding[:, 0, :]
        res = self.fc(multi_embedding)

        return res
    
classifier = MultiModelClassifier()



# 创建Dataset
train_dataset = EEGDataset(train_data, train_labels)
test_dataset = EEGDataset(test_data, test_labels)

# 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

# 训练模型
num_epochs = 10
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (eeg_data, image_data, labels) in enumerate(train_loader):
        eeg_data = eeg_data.to(device)
        image_data = image_data.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = classifier(eeg_data, image_data)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))


Creating RawArray with float64 data, n_channels=14, n_times=8064
    Range : 0 ... 8063 =      0.000 ...    62.992 secs
Ready.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 50 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 50.00 Hz
- Upper transition bandwidth: 12.50 Hz (-6 dB cutoff frequency: 56.25 Hz)
- Filter length: 423 samples (3.305 s)

Fitting ICA to data using 14 channels (please be patient, this may take a while)
