In [3]:
#### -*- coding: utf-8 -*-
"""
Created on Wed Jan  8 13:58:40 2025

@author: 29551
"""
!pip install ptflops
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
from tqdm import tqdm
import seaborn as sns
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score, confusion_matrix, recall_score, precision_score
import matplotlib.pyplot as plt
from ptflops import get_model_complexity_info
import time
# 读取数据
data = pd.read_csv('/kaggle/input/mitbih/ECG_data.csv')

# 提取信号和标签
signals = data['Signal'].apply(lambda x: np.fromstring(x[1:-1], sep=',')).values
labels = data['Label'].values

# 转换信号为 NumPy 数组
signals = np.array([np.array(signal) for signal in signals])

# 转换信号为 PyTorch 张量
signals = torch.tensor(signals, dtype=torch.float32).unsqueeze(1)  # 增加通道维度 (N, 1, 300)

# 标签编码
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)


# 划分训练和测试集
X_train, X_temp, y_train, y_temp = train_test_split(
    signals, labels_encoded, test_size=0.3, random_state=42, stratify=labels_encoded)

# 第二步：将临时集中的 2/3 设为验证集（20%），1/3 设为测试集（10%）
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=1/3, random_state=42, stratify=y_temp)

# 转换为Tensor
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# 创建DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

# Focal Loss 定义
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=0.25):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha

    def forward(self, logits, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(logits, targets)
        pt = torch.exp(-ce_loss)  # 计算 p_t
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss.mean()



# Transformer 模块定义
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.attention = nn.MultiheadAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, embed_dim)
        )
        self.layernorm1 = nn.LayerNorm(embed_dim)
        self.layernorm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Self-attention
        attn_output, _ = self.attention(x, x, x)
        x = self.layernorm1(x + self.dropout(attn_output))  # 残差连接 + LayerNorm

        # Feed-forward
        ff_output = self.feed_forward(x)
        x = self.layernorm2(x + self.dropout(ff_output))  # 残差连接 + LayerNorm
        return x

# 分类模型定义
class CNNTransformerModel(nn.Module):
    def __init__(self, input_length, num_classes, embed_dim=128, num_heads=4, ff_dim=256, num_layers=2, dropout=0.2):
        super(CNNTransformerModel, self).__init__()
        # CNN 特征提取模块
        self.conv1 = nn.Conv1d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(32, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Transformer 模块
        self.embedding = nn.Linear(64, embed_dim)  # 将 CNN 输出特征映射到 Transformer 输入
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, ff_dim, dropout) for _ in range(num_layers)
        ])

        # 分类层
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # CNN 模块
        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.pool(x)  # 输出维度: (batch_size, channels, seq_length)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.pool(x)
        # 转置以适配 Transformer 输入
        x = x.permute(0, 2, 1)  # 转换为 (batch_size, seq_length, features)
        x = self.embedding(x)   # 映射到 Transformer 嵌入维度 (batch_size, seq_length, embed_dim)

        # Transformer 模块
        for transformer in self.transformer_blocks:
            x = transformer(x)

        # 分类模块
        x = x.mean(dim=1)  # 全局平均池化 (batch_size, embed_dim)
        outputs = self.fc(x)  # 全连接层
        return outputs

# 模型、损失函数和优化器
input_length = 300
num_classes =5
model = CNNTransformerModel(input_length=input_length, num_classes=num_classes)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 选择设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 定义损失函数
criterion = FocalLoss()  # 使用 Focal Loss

def compute_test_metrics(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    all_targets = []
    all_predictions = []
    
    start_time = time.time()  # ✅ 开始计时
    with torch.no_grad():
        for data, target in data_loader:
            data = data.to(device)       # input to GPU
            target = target.to(device) 
            output = model(data)
            _, predicted = torch.max(output, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            all_targets.extend(target.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
            
    end_time = time.time()  # ✅ 结束计时
    elapsed_time = end_time - start_time
    accuracy = 100 * correct / total
    f1 = f1_score(all_targets, all_predictions, average='weighted')
    sensitivity = recall_score(all_targets, all_predictions, average='weighted')
    precision = precision_score(all_targets, all_predictions, average='weighted')

    conf_matrix = confusion_matrix(all_targets, all_predictions)
    tn = conf_matrix.sum() - conf_matrix.sum(axis=0) - conf_matrix.sum(axis=1) + np.diagonal(conf_matrix)
    fp = conf_matrix.sum(axis=0) - np.diagonal(conf_matrix)
    specificity_per_class = tn / (tn + fp + 1e-6)
    samples_per_class = conf_matrix.sum(axis=1)
    total_samples = np.sum(samples_per_class)
    specificity = np.sum((samples_per_class / total_samples) * specificity_per_class)

    return accuracy, f1, sensitivity, precision, specificity, conf_matrix,elapsed_time

def plot_acc_loss(history):
    epochs = range(1, len(history['train_acc']) + 1)

    fig, ax1 = plt.subplots(figsize=(10, 6))
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy (%)', color='tab:blue')
    ax1.plot(epochs, history['train_acc'], label='Train Accuracy', color='tab:blue', linestyle='-')
    ax1.plot(epochs, history['val_acc'], label='Val Accuracy', color='tab:blue', linestyle='--')
    ax1.tick_params(axis='y', labelcolor='tab:blue')
    ax1.set_ylim(90,100)
    ax1.grid(-True)
    
    ax2 = ax1.twinx()
    ax2.set_ylabel('Loss', color='tab:red')
    ax2.plot(epochs, history['train_loss'], label='Train Loss', color='tab:red', linestyle='-')
    ax2.plot(epochs, history['val_loss'], label='Val Loss', color='tab:red', linestyle='--')
    ax2.tick_params(axis='y', labelcolor='tab:red')
    ax2.set_ylim(0,0.03)
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2, loc='center right')

    plt.title('Accuracy and Loss over Epochs')
    fig.tight_layout()
    plt.savefig("CNNs+Transformer Accuracy_Loss Dual Axis.png", dpi=300, bbox_inches='tight', pad_inches=0.1)
    plt.show()


def train_and_evaluate(model, train_loader, val_loader, test_loader, criterion, optimizer, num_epochs):
    history = {
        'train_acc': [], 'val_acc': [],
        'train_loss': [], 'val_loss': []
    }
    best_val_loss = float('inf')  
    best_model_path = 'CNNs+Transformer best_model.pth'
    for epoch in range(num_epochs):
        # === Training ===
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        for data, target in train_loader:
            optimizer.zero_grad()
            data = data.to(device)       # input to GPU
            target = target.to(device) 
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(output, 1)
            total_train += target.size(0)
            correct_train += (predicted == target).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = 100 * correct_train / total_train
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)

        # === Validation ===
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for data, target in val_loader:
                data = data.to(device)       # input to GPU
                target = target.to(device)   # label to GPU (if needed for loss)
                output = model(data)
                loss = criterion(output, target)
                val_loss += loss.item()
                _, predicted = torch.max(output, 1)
                total_val += target.size(0)
                correct_val += (predicted == target).sum().item()

        val_loss /= len(val_loader)
        val_acc = 100 * correct_val / total_val
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)



        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"  Train -> Acc: {train_acc:.2f}% | Loss: {train_loss:.4f}")
        print(f"  Val   -> Acc: {val_acc:.2f}% | Loss: {val_loss:.4f}")

        # === Save Best Model (based on validation loss) ===
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_path)
            print("  ✅ Best model saved (state_dict).")
        else:
            print("  No improvement.")

    # === 重新加载最优模型进行测试 ===
    model.load_state_dict(torch.load(best_model_path))
    model.eval()

    # === 测试集评估 ===
    start_time = time.time()
    test_acc, test_f1, test_sen, test_ppv, test_spe, conf_matrix, test_time = compute_test_metrics(model, test_loader)
    test_time = time.time() - start_time

    print("\n=== Final Test Set Performance ===")
    print(f"Test Acc: {test_acc:.2f}% | F1: {test_f1:.4f} | Sensitivity: {test_sen:.4f} | "
          f"PPV: {test_ppv:.4f} | Specificity: {test_spe:.4f}")
    print(f"🕒 Inference Time on Test Set: {test_time:.2f} seconds")

    # === 计算模型参数量和 FLOPs ===
    print("\n=== Model Complexity ===")
    macs, params = get_model_complexity_info(
        model, input_res=(1,300), as_strings=True,
        print_per_layer_stat=False, verbose=False
    )
    print(f"📊 Params: {params}")
    print(f"📊 FLOPs: {macs}")

    # === 保存模型结构复杂度信息到文件 ===
    with open("CNNs+Transformer_Model_Complexity.txt", "w") as f:
        f.write(f"Params: {params}\nFLOPs: {macs}\nTest Time: {test_time:.2f} sec")

    return model, history

train_and_evaluate(model, train_loader, val_loader, test_loader, criterion, optimizer, num_epochs=50)     



  X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
  X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
  X_test_tensor = torch.tensor(X_test, dtype=torch.float32)


Epoch [1/50]
  Train -> Acc: 87.97% | Loss: 0.0502
  Val   -> Acc: 93.11% | Loss: 0.0263
  ✅ Best model saved (state_dict).
Epoch [2/50]
  Train -> Acc: 93.81% | Loss: 0.0232
  Val   -> Acc: 94.48% | Loss: 0.0185
  ✅ Best model saved (state_dict).
Epoch [3/50]
  Train -> Acc: 94.76% | Loss: 0.0185
  Val   -> Acc: 94.94% | Loss: 0.0183
  ✅ Best model saved (state_dict).
Epoch [4/50]
  Train -> Acc: 95.38% | Loss: 0.0160
  Val   -> Acc: 96.05% | Loss: 0.0136
  ✅ Best model saved (state_dict).
Epoch [5/50]
  Train -> Acc: 95.67% | Loss: 0.0144
  Val   -> Acc: 96.18% | Loss: 0.0125
  ✅ Best model saved (state_dict).
Epoch [6/50]
  Train -> Acc: 95.97% | Loss: 0.0134
  Val   -> Acc: 96.91% | Loss: 0.0105
  ✅ Best model saved (state_dict).
Epoch [7/50]
  Train -> Acc: 96.14% | Loss: 0.0123
  Val   -> Acc: 96.69% | Loss: 0.0113
  No improvement.
Epoch [8/50]
  Train -> Acc: 96.34% | Loss: 0.0120
  Val   -> Acc: 96.74% | Loss: 0.0116
  No improvement.
Epoch [9/50]
  Train -> Acc: 96.32% | Loss

(CNNTransformerModel(
   (conv1): Conv1d(1, 32, kernel_size=(3,), stride=(1,), padding=(1,))
   (conv2): Conv1d(32, 128, kernel_size=(3,), stride=(1,), padding=(1,))
   (conv3): Conv1d(128, 64, kernel_size=(3,), stride=(1,), padding=(1,))
   (relu): ReLU()
   (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
   (embedding): Linear(in_features=64, out_features=128, bias=True)
   (transformer_blocks): ModuleList(
     (0-1): 2 x TransformerBlock(
       (attention): MultiheadAttention(
         (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
       )
       (feed_forward): Sequential(
         (0): Linear(in_features=128, out_features=256, bias=True)
         (1): ReLU()
         (2): Dropout(p=0.2, inplace=False)
         (3): Linear(in_features=256, out_features=128, bias=True)
       )
       (layernorm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
       (layernorm2): LayerNorm((128,), eps=1e-05, el