In [1]:
import numpy as np 
import matplotlib.pyplot as plt
import scipy.io as sio
import csv
import glob
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch import nn
import os 

import torch.optim as optim
import torch.nn.functional as F

from torch.autograd import Variable
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader,random_split


  from .autonotebook import tqdm as notebook_tqdm


In [67]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms

# 定义EEG数据集类
class EEGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        eeg = self.data[index]
        label = self.labels[index]
        return eeg, label

# 数据文件夹路径
data_folder = './EEGFeature4/'

# 读取并组合数据
data = None
labels = []

# 遍历文件夹中的文件
for filename in os.listdir(data_folder):
    file_path = os.path.join(data_folder, filename)
    
    # 仅处理CSV文件
    if filename.endswith('.csv'):
        # 加载数据
        file_data = pd.read_csv(file_path, header=0)
        
        # 提取特征和标签
        features = file_data.iloc[:, 0:210].values
        # print(features)
        file_labels = file_data.iloc[:, 210].values
        # print(file_labels)
        
        # 将数据添加到整体数据集中
        if data is None:
            data = features
        else:
            data = np.concatenate((data, features), axis=0)
        
        labels.extend(file_labels)

# 将标签转换为NumPy数组
labels = np.array(labels)
print(data.shape)
print(labels.shape)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.3, random_state=42)

# 数据预处理
X_train = np.expand_dims(X_train, axis=1)  # 添加通道维度
X_test = np.expand_dims(X_test, axis=1)    # 添加通道维度

# 计算均值和标准差
mean = np.mean(X_train)
std = np.std(X_train)

# 数据标准化
normalize = transforms.Normalize(mean=[mean], std=[std])

# 转换为PyTorch张量并移动到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.long).to(device)

# 创建数据加载器
batch_size = 32
train_dataset = EEGDataset(X_train, y_train)
test_dataset = EEGDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# 构建LSTM模型
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dropout = nn.Dropout(0.8)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size)
        c0 = torch.zeros(1, x.size(0), self.hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        out = self.activation(out)
        out = self.dropout(out) # 添加Dropout层
        out = self.fc(out[:, -1, :])  # 只使用最后一个时间步的输出
        return out

# 构建LSTM模型并移动到GPU
input_size = X_train.shape[2]
hidden_size = 256
num_classes = 3
model = LSTMModel(input_size, hidden_size, num_classes).to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1)

# 训练模型
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    for batch_data, batch_labels in train_loader:
        # 数据标准化并移动到GPU
        batch_data = normalize(batch_data.to(device))
        
        # 前向传播
        outputs = model(batch_data)
        loss = criterion(outputs, batch_labels)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # 在测试集上计算准确率
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for batch_data, batch_labels in test_loader:
            # 数据标准化并移动到设备
            batch_data = normalize(batch_data.to(device))
            
            outputs = model(batch_data)
            _, predicted = torch.max(outputs.data, 1)
            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()
        
        accuracy = correct / total
        print(f'Epoch {epoch+1}/{num_epochs}, 测试集准确率: {accuracy:.6f}')


# 在完整测试集上计算准确率
model.eval()
with torch.no_grad():
    X_test = normalize(X_test.to(device))
    outputs = model(X_test)
    _, predicted = torch.max(outputs.data, 1)
    total = y_test.size(0)
    correct = (predicted == y_test).sum().item()
    accuracy = correct / total
    print('完整测试集准确率:', accuracy)


(399, 210)
(399,)
Epoch 1/100, 测试集准确率: 0.708333
Epoch 2/100, 测试集准确率: 0.708333
Epoch 3/100, 测试集准确率: 0.708333
Epoch 4/100, 测试集准确率: 0.708333
Epoch 5/100, 测试集准确率: 0.708333
Epoch 6/100, 测试集准确率: 0.708333
Epoch 7/100, 测试集准确率: 0.700000
Epoch 8/100, 测试集准确率: 0.708333
Epoch 9/100, 测试集准确率: 0.708333
Epoch 10/100, 测试集准确率: 0.708333
Epoch 11/100, 测试集准确率: 0.708333
Epoch 12/100, 测试集准确率: 0.708333
Epoch 13/100, 测试集准确率: 0.708333
Epoch 14/100, 测试集准确率: 0.716667
Epoch 15/100, 测试集准确率: 0.708333
Epoch 16/100, 测试集准确率: 0.708333
Epoch 17/100, 测试集准确率: 0.708333
Epoch 18/100, 测试集准确率: 0.708333
Epoch 19/100, 测试集准确率: 0.708333
Epoch 20/100, 测试集准确率: 0.700000
Epoch 21/100, 测试集准确率: 0.708333
Epoch 22/100, 测试集准确率: 0.708333
Epoch 23/100, 测试集准确率: 0.708333
Epoch 24/100, 测试集准确率: 0.708333
Epoch 25/100, 测试集准确率: 0.708333
Epoch 26/100, 测试集准确率: 0.708333
Epoch 27/100, 测试集准确率: 0.708333
Epoch 28/100, 测试集准确率: 0.708333
Epoch 29/100, 测试集准确率: 0.708333
Epoch 30/100, 测试集准确率: 0.700000
Epoch 31/100, 测试集准确率: 0.708333
Epoch 32/100, 测试集准确率: 0.708333

In [8]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import StratifiedKFold
import torchvision.transforms as transforms
from sklearn.metrics import classification_report

# 定义EEG数据集类
class EEGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        eeg = self.data[index]
        label = self.labels[index]
        return eeg, label

# 数据文件夹路径
data_folder = './EEGFeature4/'

# 读取并组合数据
data = None
labels = []

# 遍历文件夹中的文件
for filename in os.listdir(data_folder):
    file_path = os.path.join(data_folder, filename)
    
    # 仅处理CSV文件
    if filename.endswith('.csv'):
        # 加载数据
        file_data = pd.read_csv(file_path, header=0)
        
        # 提取特征和标签
        features = file_data.iloc[:, 0:210].values
        # print(features)
        file_labels = file_data.iloc[:, 210].values
        # print(file_labels)
        
        # 将数据添加到整体数据集中
        if data is None:
            data = features
        else:
            data = np.concatenate((data, features), axis=0)
        
        labels.extend(file_labels)

# 将标签转换为NumPy数组
labels = np.array(labels)
print(data.shape)
print(labels.shape)

# 数据预处理
data = np.expand_dims(data, axis=1)  # 添加通道维度

# 计算均值和标准差
mean = np.mean(data)
std = np.std(data)

# 数据标准化
normalize = transforms.Normalize(mean=[mean], std=[std])

# 转换为PyTorch张量并移动到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data = torch.tensor(data, dtype=torch.float32).to(device)
labels = torch.tensor(labels, dtype=torch.long).to(device)

# 定义交叉验证
k_folds = 10
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

# 构建LSTM模型
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.dropout = nn.Dropout(0.7)
        self.activation = nn.ReLU()
    
    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size)
        c0 = torch.zeros(1, x.size(0), self.hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        out = self.activation(out)
        out = self.dropout(out) # 添加Dropout层
        out = self.fc(out[:, -1, :])  # 只使用最后一个时间步的输出
        return out

# 定义损失函数
criterion = nn.CrossEntropyLoss()

# 进行交叉验证
for fold, (train_index, test_index) in enumerate(skf.split(data, labels)):
    print(f"Fold {fold+1}/{k_folds}")
    
    # 划分训练集和测试集
    X_train, X_test = data[train_index], data[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    
    # 创建数据加载器
    batch_size = 32
    train_dataset = EEGDataset(X_train, y_train)
    test_dataset = EEGDataset(X_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    
    # 构建LSTM模型并移动到GPU
    input_size = X_train.shape[2]
    hidden_size = 128
    num_classes = 3
    model = LSTMModel(input_size, hidden_size, num_classes).to(device)
    
    # 定义优化器
    optimizer = optim.Adam(model.parameters(), lr=1)
    
    # 训练模型
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch_data, batch_labels in train_loader:
            # 数据标准化并移动到GPU
            batch_data = normalize(batch_data.to(device))
            
            # 前向传播
            outputs = model(batch_data)
            loss = criterion(outputs, batch_labels)
            total_loss += loss.item()
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        # 在测试集上计算准确率
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for batch_data, batch_labels in test_loader:
                # 数据标准化并移动到设备
                batch_data = normalize(batch_data.to(device))
                
                outputs = model(batch_data)
                _, predicted = torch.max(outputs.data, 1)
                total += batch_labels.size(0)
                correct += (predicted == batch_labels).sum().item()
            
            accuracy = correct / total
            average_loss = total_loss / len(train_loader)

            # 打印每个epoch的损失和准确率
            print(f'Epoch {epoch+1}/{num_epochs}, 训练损失: {average_loss:.6f}, 测试集准确率: {accuracy:.6f}')


            # print(f'Epoch {epoch+1}/{num_epochs}, loss:{}, 测试集准确率: {accuracy:.6f}')
    
    # 在完整测试集上计算准确率和分类指标报告
    model.eval()
    with torch.no_grad():
        X_test = normalize(X_test.to(device))
        outputs = model(X_test)
        _, predicted = torch.max(outputs.data, 1)
        total = y_test.size(0)
        correct = (predicted == y_test).sum().item()
        accuracy = correct / total
        print('完整测试集准确率:', accuracy)
        


(399, 210)
(399,)
Fold 1/10
Epoch 1/50, 训练损失: 17.749919, 测试集准确率: 0.750000
Epoch 2/50, 训练损失: 12.206748, 测试集准确率: 0.725000
Epoch 3/50, 训练损失: 11.332375, 测试集准确率: 0.725000
Epoch 4/50, 训练损失: 7.491485, 测试集准确率: 0.100000
Epoch 5/50, 训练损失: 14.507292, 测试集准确率: 0.725000
Epoch 6/50, 训练损失: 10.053841, 测试集准确率: 0.725000
Epoch 7/50, 训练损失: 9.561117, 测试集准确率: 0.725000
Epoch 8/50, 训练损失: 10.662595, 测试集准确率: 0.725000
Epoch 9/50, 训练损失: 7.745742, 测试集准确率: 0.175000
Epoch 10/50, 训练损失: 9.214078, 测试集准确率: 0.725000
Epoch 11/50, 训练损失: 8.575583, 测试集准确率: 0.725000
Epoch 12/50, 训练损失: 7.792105, 测试集准确率: 0.725000
Epoch 13/50, 训练损失: 7.446390, 测试集准确率: 0.725000
Epoch 14/50, 训练损失: 6.306402, 测试集准确率: 0.725000
Epoch 15/50, 训练损失: 6.866673, 测试集准确率: 0.725000
Epoch 16/50, 训练损失: 7.611208, 测试集准确率: 0.125000
Epoch 17/50, 训练损失: 8.643507, 测试集准确率: 0.750000
Epoch 18/50, 训练损失: 6.661799, 测试集准确率: 0.750000
Epoch 19/50, 训练损失: 5.694741, 测试集准确率: 0.750000
Epoch 20/50, 训练损失: 6.826247, 测试集准确率: 0.725000
Epoch 21/50, 训练损失: 8.217622, 测试集准确率: 0.725000
Epoch 22/