In [1]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn, optim
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score
import numpy as np
from scipy.io import loadmat
import os
import pandas as pd
from torchvision import transforms

In [2]:
# 自动检测 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using GPU: {torch.cuda.is_available()}")

# 加载标签文件
labels_path = 'processed_data/processed_labels.csv'
if os.path.exists(labels_path):
    labels_df = pd.read_csv(labels_path)
    print("Loaded labels successfully.")
else:
    print(f"Labels file not found at {labels_path}")
    exit()


Using GPU: True
Loaded labels successfully.


In [3]:
# 自定义 ECG 数据集类
class ECGDataset(Dataset):
    def __init__(self, data_folder, labels_df, transform=None):
        self.data_folder = data_folder
        self.labels_df = labels_df
        self.transform = transform
        self.label_encoder = LabelEncoder()
        self.labels_df['label_encoded'] = self.label_encoder.fit_transform(self.labels_df.iloc[:, 1])

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        file_name = self.labels_df.iloc[idx, 0] + '.mat'
        label = self.labels_df.iloc[idx, 2]
        file_path = os.path.join(self.data_folder, file_name)
        if os.path.exists(file_path):
            data = loadmat(file_path)['val'].squeeze()
            data = (data - np.mean(data)) / np.std(data)  # 归一化
            data = torch.tensor(data, dtype=torch.float32).unsqueeze(0)
            if self.transform:
                data = self.transform(data)
            label = torch.tensor(label, dtype=torch.long)
            return data, label
        else:
            print(f"File not found: {file_path}")
            return None, None


In [4]:

# 数据增强配置
data_transform = transforms.Compose([
    transforms.Lambda(lambda x: x + 0.05 * torch.randn_like(x)),  # 添加噪声
    transforms.RandomApply([transforms.Lambda(lambda x: x * (1 + 0.1 * torch.randn_like(x)))], p=0.5),  # 随机缩放
])

# 数据集和 DataLoader
data_folder_path = 'processed_data'
ecg_dataset = ECGDataset(data_folder=data_folder_path, labels_df=labels_df, transform=data_transform)

# 检查数据集加载
if len(ecg_dataset) > 0:
    print(f"Loaded dataset with {len(ecg_dataset)} samples.")
else:
    print("Dataset is empty. Please check data paths.")
    exit()

# 将数据集分为训练集和测试集（70% 训练，30% 测试）
train_size = int(0.7 * len(ecg_dataset))
test_size = len(ecg_dataset) - train_size
train_dataset, test_dataset = random_split(ecg_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  # 减小 batch size
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


Loaded dataset with 16761 samples.


In [5]:
# 改进后的 CNN 模型
class ECGClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ECGClassifier, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        self.conv4 = nn.Conv1d(256, 512, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm1d(512)

        # 增加池化层
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        self.global_pool = nn.AdaptiveAvgPool1d(1)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # 提高 dropout 比例

        # 全连接层
        self.fc1 = nn.Linear(512, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = self.pool(self.relu(self.bn3(self.conv3(x))))
        x = self.pool(self.relu(self.bn4(self.conv4(x))))
        x = self.global_pool(x).view(x.size(0), -1)
        x = self.dropout(self.relu(self.fc1(x)))
        x = self.dropout(self.relu(self.fc2(x)))
        x = self.fc3(x)
        return x


In [None]:
# 初始化模型、损失函数和优化器
num_classes = len(labels_df['label_encoded'].unique())
model = ECGClassifier(num_classes=num_classes).to(device)
print(f"Model initialized with {num_classes} output classes.")

class_weights = torch.ones(num_classes, device=device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=1e-4)  # 调小学习率
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

training_loss = []
test_accuracy = []
test_f1_scores = []

In [6]:
# 训练模型
num_epochs = 300
for epoch in range(num_epochs):
    print(f"\nStarting epoch {epoch + 1}/{num_epochs}...")
    model.train()
    running_loss = 0.0
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if (batch_idx + 1) % 10 == 0:
            print(f"  Batch {batch_idx + 1}/{len(train_loader)}, Loss: {loss.item():.4f}")

    avg_loss = running_loss / len(train_loader)
    training_loss.append(avg_loss)
    print(f"Epoch {epoch + 1} Loss: {avg_loss:.4f}")

    # 测试集上评估模型
    model.eval()
    correct = 0
    total = 0
    all_labels = []
    all_predictions = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    accuracy = 100 * correct / total
    f1 = f1_score(all_labels, all_predictions, average='macro')
    test_accuracy.append(accuracy)
    test_f1_scores.append(f1)
    print(f"Test Accuracy: {accuracy:.2f}%, F1 Score: {f1:.4f}")

    scheduler.step()

Model initialized with 4 output classes.

Starting epoch 1/300...
  Batch 10/734, Loss: 1.3546
  Batch 20/734, Loss: 1.2554
  Batch 30/734, Loss: 1.1488
  Batch 40/734, Loss: 1.3001
  Batch 50/734, Loss: 1.0211
  Batch 60/734, Loss: 1.0665
  Batch 70/734, Loss: 1.0047
  Batch 80/734, Loss: 0.9413
  Batch 90/734, Loss: 0.9915
  Batch 100/734, Loss: 0.9202
  Batch 110/734, Loss: 0.8659
  Batch 120/734, Loss: 1.0698
  Batch 130/734, Loss: 1.0128
  Batch 140/734, Loss: 0.9960
  Batch 150/734, Loss: 1.3015
  Batch 160/734, Loss: 1.0964
  Batch 170/734, Loss: 1.1357
  Batch 180/734, Loss: 0.7503
  Batch 190/734, Loss: 0.8660
  Batch 200/734, Loss: 1.0326
  Batch 210/734, Loss: 1.1253
  Batch 220/734, Loss: 0.6606
  Batch 230/734, Loss: 0.9212
  Batch 240/734, Loss: 1.3858
  Batch 250/734, Loss: 0.9188
  Batch 260/734, Loss: 1.0378
  Batch 270/734, Loss: 0.9526
  Batch 280/734, Loss: 0.7089
  Batch 290/734, Loss: 0.7276
  Batch 300/734, Loss: 1.3198
  Batch 310/734, Loss: 0.9580
  Batch 320/7

In [7]:
# 保存每个 epoch 的训练损失、测试准确率和 F1 分数
results = {
    'training_loss': training_loss,
    'test_accuracy': test_accuracy,
    'test_f1_scores': test_f1_scores
}
results_path = "results/training_results.pth"
torch.save(results, results_path)
print(f"Training results saved to {results_path}")

# 保存完整模型
model_path = "model/ecg_classifier_model.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

Training results saved to results/training_results.pth
Model saved to model/ecg_classifier_model.pth


In [8]:
# 在测试集上评估最终准确率和 F1 分数
model.eval()
correct = 0
total = 0
all_labels = []
all_predictions = []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

final_accuracy = 100 * correct / total
final_f1 = f1_score(all_labels, all_predictions, average='macro')
print(f"Final Test Accuracy: {final_accuracy:.2f}%, F1 Score: {final_f1:.4f}")

Final Test Accuracy: 70.89%, F1 Score: 0.4710
