In [1]:
import matplotlib.pyplot as plt
import torch
import numpy as np
import torch.nn as nn
import neurokit2 as nk
import sys
import os

In [2]:
import argparse
import sys
from main import parse_args as parse_args_main

def main(args):
    # 在这里使用 args 进行训练或其他操作
    args=parse_args_main()
    print(f"Task: {args.task}")
    print(f"Dataset Name: {args.dataset_name}")
    print(f"Train Data Path: {args.train_data_path}")
    print(f"Model Name: {args.model_name}")
    print(f"Mask Type: {args.mask_type}")
    return args
    # 其他代码...

if __name__ == "__main__":
    # 模拟命令行参数
    sys.argv = [
        'main.py',  # 通常是脚本名
        '--task', 'finetune',
        '--dataset_name', 'ptb-xl',
        '--train_data_path', '/root/data/ptb-xl/train.txt',
        '--val_data_path', '/root/data/ptb-xl/val.txt',
        '--model_name', 'FocusMae',
        '--mask_type', 'period',
        # 添加其他必要的参数
    ]

    parser = argparse.ArgumentParser(description='Pretrain or finetune model')
    # 添加参数
    parser.add_argument('--task', type=str, required=True, help='Task to perform')
    parser.add_argument('--dataset_name', type=str, required=True, help='Name of the dataset')
    parser.add_argument('--train_data_path', type=str, required=True, help='Path to the training data')
    parser.add_argument('--val_data_path', type=str, required=True, help='Path to the validation data')
    parser.add_argument('--model_name', type=str, required=True, default='FocusMae',help='Name of the model to use')
    parser.add_argument('--mask_type', type=str, default='period', help='Type of mask to use')  
    parser.add_argument('--ckpt_path', type=str, default='/root/ecg_ai/FocusECG/FocusECG/min_val_loss=34.49940490722656.pth', help='Type of mask to use')  
    parsed_args=parse_args_main()
    # 解析参数
    args = parser.parse_args()

    for key, value in vars(parsed_args).items():
        if value is not None:
            setattr(args, key, value)

    # 调用主函数
    main(args)

  from .autonotebook import tqdm as notebook_tqdm


cuda
cuda
cuda
Task: finetune
Dataset Name: ptb-xl
Train Data Path: /root/data/ptb-xl/train.txt
Model Name: FocusMae
Mask Type: period


In [3]:
def calculate_r_peaks_ratio(mask,all_r_peaks):
    # 定义参数
    num_samples = 512
    num_patches = 30
    patch_size = 75

    # 初始化一个数组来存储每个样本的 R 波在 mask 为 0 的比率
    r_peaks_ratio = np.zeros(num_samples)

    # 计算每个样本的 R 波比率
    for sample_idx, r_peaks in enumerate(all_r_peaks):
        # 计算每个 R 波位置所属的 patch 索引
        patch_indices = r_peaks
        
        # 计算 R 波在 mask 为 0 的数量
        count = 0
        for patch_idx in patch_indices:
            if patch_idx < num_patches and mask[sample_idx, patch_idx] == 0:  # 确保索引不超出范围且 mask 为 0
                count += 1
        
            # 计算比率
        r_peaks_ratio[sample_idx] = count / num_patches if len(r_peaks) > 0 else 0

    # 转换为 torch.Tensor 并调整形状
    r_peaks_ratio_tensor = torch.tensor(r_peaks_ratio).view(num_samples, 1)
    overall_average = r_peaks_ratio_tensor.mean()
    return overall_average


In [4]:
def check_r_peaks_ratio(x):
    # 假设 x 是你的 ECG 信号，形状为 (512, 1, 2250)
    all_r_peaks = []

    for sample_idx in range(x.shape[0]):  # 遍历每个样本
        ecg_signal = x[sample_idx, 0].cpu().numpy()  # 将信号展平成一维数组

        # 预处理信号
        ecg_cleaned = nk.ecg_clean(ecg_signal, sampling_rate=250)

        # 检测 R 波
        _, r_peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=250)

        # 获取 R 波位置
        patch_size=75
        r_peak_indices = r_peaks['ECG_R_Peaks']// patch_size
        all_r_peaks.append(r_peak_indices)
    return all_r_peaks


In [3]:
from finetune_test import get_model
model = get_model(args)

In [5]:
model.pre_train_model.blocks

ModuleList(
  (0-11): 12 x Block(
    (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
    (attn): Attention(
      (qkv): Linear(in_features=768, out_features=2304, bias=True)
      (q_norm): Identity()
      (k_norm): Identity()
      (attn_drop): Dropout(p=0.0, inplace=False)
      (proj): Linear(in_features=768, out_features=768, bias=True)
      (proj_drop): Dropout(p=0.0, inplace=False)
    )
    (ls1): Identity()
    (drop_path1): Identity()
    (norm2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
    (mlp): Mlp(
      (fc1): Linear(in_features=768, out_features=3072, bias=True)
      (act): GELU(approximate='none')
      (drop1): Dropout(p=0.0, inplace=False)
      (norm): Identity()
      (fc2): Linear(in_features=3072, out_features=768, bias=True)
      (drop2): Dropout(p=0.0, inplace=False)
    )
    (ls2): Identity()
    (drop_path2): Identity()
  )
)

In [6]:
args.mask_type

'period'

In [11]:
from dataset import PretrainDataset
from torch.utils.data import DataLoader
from config import FocusMaePreTrainConfig as PreTrainConfig
from finetune_test import get_model
import signal
from tqdm import tqdm
def dataloader_signal_handle(worker_id):
    def signal_handler(sig, frame):
        pass
    signal.signal(signal.SIGTERM, signal_handler)
total_sum = 0
total_count = 0
train_data_path="/root/data/FocusMAE/train.txt"
train_dataset = PretrainDataset(train_data_path)
train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, num_workers=4, worker_init_fn=dataloader_signal_handle,pin_memory=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(41)
torch.cuda.manual_seed(41)
print(device)
for batch in tqdm(train_dataloader, desc="Processing Batches"):
    model = get_model(args)
    batch = batch.to(device)
    all_r_peaks=check_r_peaks_ratio(batch)
    pred_img,mask,info_scores,mask_probs= model.pre_train_model.forward_info_score(batch)
    # mask=generate_mask(batch)
    r_peaks_ratio_tensor=calculate_r_peaks_ratio(mask,all_r_peaks)
    # 累加当前批次的总和和数量
    total_sum += r_peaks_ratio_tensor
    total_count += 1

# 计算所有批次的平均值
print(total_sum)
print(total_count)
overall_average = total_sum / total_count

print("所有批次的 R 波比率的平均值:")
print(overall_average)

cuda


Processing Batches: 100%|██████████| 247/247 [45:27<00:00, 11.04s/it] 

tensor(20.5826, dtype=torch.float64)
247
所有批次的 R 波比率的平均值:
tensor(0.0833, dtype=torch.float64)





In [None]:
import argparse
import sys
from main import parse_args as parse_args_main

def main(args):
    # 在这里使用 args 进行训练或其他操作
    args=parse_args_main()
    print(f"Task: {args.task}")
    print(f"Dataset Name: {args.dataset_name}")
    print(f"Train Data Path: {args.train_data_path}")
    print(f"Model Name: {args.model_name}")
    print(f"Mask Type: {args.mask_type}")
    return args
    # 其他代码...

if __name__ == "__main__":
    # 模拟命令行参数
    sys.argv = [
        'main.py',  # 通常是脚本名
        '--task', 'finetune',
        '--dataset_name', 'ptb-xl',
        '--train_data_path', '/root/data/ptb-xl/train.txt',
        '--val_data_path', '/root/data/ptb-xl/val.txt',
        '--model_name', 'FocusMae',
        '--mask_type', 'period',
        # 添加其他必要的参数
    ]

    parser = argparse.ArgumentParser(description='Pretrain or finetune model')
    # 添加参数
    parser.add_argument('--task', type=str, required=True, help='Task to perform')
    parser.add_argument('--dataset_name', type=str, required=True, help='Name of the dataset')
    parser.add_argument('--train_data_path', type=str, required=True, help='Path to the training data')
    parser.add_argument('--val_data_path', type=str, required=True, help='Path to the validation data')
    parser.add_argument('--model_name', type=str, required=True, default='FocusMae',help='Name of the model to use')
    parser.add_argument('--mask_type', type=str, default='period', help='Type of mask to use')  
    parsed_args=parse_args_main()
    # 解析参数
    args = parser.parse_args()

    for key, value in vars(parsed_args).items():
        if value is not None:
            setattr(args, key, value)

    # 调用主函数
    main(args)

In [4]:
from finetune_test import get_model
if __name__ == '__main__':
    model = get_model(args)
   

In [9]:
model

Classifier(
  (pre_train_model): MaskedAutoencoderViT(
    (patch_embed): patchEmbed(
      (norm): LayerNorm((75,), eps=1e-06, elementwise_affine=True)
    )
    (timeweight): TimeSeriesWeighting(
      (norm): LayerNorm((75,), eps=1e-06, elementwise_affine=True)
    )
    (fc): Linear(in_features=75, out_features=768, bias=True)
    (blocks): ModuleList(
      (0-11): 12 x Block(
        (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (attn): Attention(
          (qkv): Linear(in_features=768, out_features=2304, bias=True)
          (q_norm): Identity()
          (k_norm): Identity()
          (attn_drop): Dropout(p=0.0, inplace=False)
          (proj): Linear(in_features=768, out_features=768, bias=True)
          (proj_drop): Dropout(p=0.0, inplace=False)
        )
        (ls1): Identity()
        (drop_path1): Identity()
        (norm2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (mlp): Mlp(
          (fc1): Linear(in_features=768, out_fe

In [8]:
pred_img,mask,info_scores,mask_probs= model.pre_train_model.forward_info_score(batch)


In [10]:
mask_probs

tensor([[0.2130, 0.1104, 0.9536,  ..., 0.0835, 0.8127, 0.6543],
        [0.3799, 0.6124, 0.7904,  ..., 0.3525, 0.1819, 0.5738],
        [0.6264, 0.4535, 0.0104,  ..., 0.7946, 0.9982, 0.6876],
        ...,
        [0.9192, 0.6501, 0.2702,  ..., 0.5882, 0.8731, 0.4438],
        [0.2699, 0.0437, 0.7667,  ..., 0.4377, 0.9402, 0.1887],
        [0.9724, 0.2704, 0.8739,  ..., 0.4435, 0.7381, 0.1922]],
       device='cuda:0')

In [5]:
import numpy as np

def generate_mask(x):
        # 定义参数
    num_samples = 512
    num_patches = 30
    num_zeros = 7

    # 初始化 mask
    mask_random = np.ones((num_samples, num_patches))

    # 为每个样本随机选择 7 个位置设置为 0
    for i in range(num_samples):
        zero_indices = np.random.choice(num_patches, num_zeros, replace=False)
        mask_random[i, zero_indices] = 0
    return mask_random

In [11]:
import neurokit2 as nk
import numpy as np

# 假设 x 是你的 ECG 信号，形状为 (512, 1, 2250)
all_r_peaks = []

for sample_idx in range(x.shape[0]):  # 遍历每个样本
    ecg_signal = x[sample_idx, 0].cpu().numpy()  # 将信号展平成一维数组

    # 预处理信号
    ecg_cleaned = nk.ecg_clean(ecg_signal, sampling_rate=250)

    # 检测 R 波
    _, r_peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=250)

    # 获取 R 波位置
    patch_size=75
    r_peak_indices = r_peaks['ECG_R_Peaks']// patch_size
    all_r_peaks.append(r_peak_indices)

# # 打印所有样本的 R 波位置
# for idx, r_peaks in enumerate(all_r_peaks):
#     print(f"样本 {idx} 的 R 波位置: {r_peaks}")

In [None]:
import torch
import numpy as np

# 假设 all_r_peaks 是一个包含每个样本 R 波位置的列表
# 假设 mask 是一个形状为 (512, 30) 的数组

# 定义参数
num_samples = 512
num_patches = 30
patch_size = 75

# 初始化一个数组来存储每个样本的 R 波在 mask 为 0 的比率
r_peaks_ratio = np.zeros(num_samples)

# 计算每个样本的 R 波比率
for sample_idx, r_peaks in enumerate(all_r_peaks):
    # 计算每个 R 波位置所属的 patch 索引
    patch_indices = r_peaks
    
    # 计算 R 波在 mask 为 0 的数量
    count = 0
    for patch_idx in patch_indices:
        if patch_idx < num_patches and mask_random[sample_idx, patch_idx] == 0:  # 确保索引不超出范围且 mask 为 0
            count += 1
    
    # 计算比率
    r_peaks_ratio[sample_idx] = count / num_patches if len(r_peaks) > 0 else 0

# 转换为 torch.Tensor 并调整形状
r_peaks_ratio_tensor = torch.tensor(r_peaks_ratio).view(num_samples, 1)
overall_average = r_peaks_ratio_tensor.mean()
# 打印结果
print("R 波在 mask 为 0 的总体比率:")
print(overall_average)

In [None]:
r_peaks_ratio_tensor.shape

In [None]:
def plot_signal_with_mask_probs(signal, mask_probs, patch_size=30):
    plt.clf()
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8))
    fig.patch.set_facecolor('white')  # 设置白色背景
    
    # 绘制原始信号
    ax1.plot(range(len(signal)), signal, color='#1f77b4', linewidth=1.5, label='Signal')
    ax1.set_title('Signal with Information Scores', fontsize=12, pad=10)
    ax1.set_ylabel('Amplitude', fontsize=10)
    ax1.grid(True, linestyle='--', alpha=0.3)
    
    # 添加patch分割和概率值
    for i in range(len(mask_probs)):
        # patch边界
        x_pos = i * patch_size
        ax1.axvline(x=x_pos, color='#ff7f0e', linestyle='--', alpha=0.2)
        
        # 半透明颜色块
        ax1.axvspan(i*patch_size, (i+1)*patch_size, 
                   alpha=float(mask_probs[i])*0.3,  # 降低透明度
                   color='#ff7f0e',
                   label=f'Patch {i}' if i == 0 else "")
        
        # 概率值标注
        patch_center = i * patch_size + patch_size/2
        score = float(mask_probs[i])
        if score > 0.2:  # 只显示重要性较高的分数
            ax1.text(patch_center, ax1.get_ylim()[1], 
                    f'{score:.2f}', 
                    horizontalalignment='center',
                    verticalalignment='bottom',
                    rotation=90,
                    fontsize=8,
                    color='#2f4f4f')
    
    # 绘制概率条形图
    patch_positions = np.arange(len(mask_probs))
    bars = ax2.bar(patch_positions, mask_probs.cpu().detach().numpy(), 
                  alpha=0.6, color='#ff7f0e', width=0.8)
    
    # 在柱状图上添加数值标签
    for bar in bars:
        height = bar.get_height()
        if height > 0.5:  # 只显示重要性较高的分数
            ax2.text(bar.get_x() + bar.get_width()/2., height,
                    f'{height:.2f}',
                    ha='center', va='bottom', fontsize=8)
    
    ax2.set_title('Information Scores per Patch', fontsize=12, pad=10)
    ax2.set_xlabel('Patch Index', fontsize=10)
    ax2.set_ylabel('Score', fontsize=10)
    ax2.grid(True, linestyle='--', alpha=0.3)
    
    # 调整布局
    plt.tight_layout()
    plt.show()

# 使用示例：
sample_idx = 110  # 选择第一个样本
signal = x[sample_idx, 0].cpu().numpy()  # [1200]
probs = info_scores[sample_idx].detach()  # [40]
plot_signal_with_mask_probs(signal, probs, patch_size=75)

In [None]:
def plot_signal_with_masks_and_pred(signal, mask_probs, mask, pred_signal, probs,patch_size=75):
    plt.clf()
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12))
    fig.patch.set_facecolor('white')
    
    # 1. 绘制原始信号和mask信息
    ax1.plot(range(len(signal)), signal, color='#1f77b4', linewidth=1.5, label='Signal')
    ax1.set_title('Original Signal with Mask Information', fontsize=12, pad=10)
    ax1.set_ylabel('Amplitude', fontsize=10)
    
    # 添加patch分割和遮盖信息
    for i in range(len(probs)):
        x_pos = i * patch_size
        ax1.axvline(x=x_pos, color='gray', linestyle='--', alpha=0.2)
        
        # 为每个patch添加颜色块
        if mask[i] == 1:  # 保留的patch
            ax1.axvspan(i*patch_size, (i+1)*patch_size, 
                       alpha=0.2,
                       color='green',
                       label='Kept' if i == 0 else "")
        else:  # 被遮盖的patch
            ax1.axvspan(i*patch_size, (i+1)*patch_size, 
                       alpha=0.2,
                       color='red',
                       label='Masked' if i == 0 else "")
        
        # 添加概率值
        patch_center = i * patch_size + patch_size/2
        score = float(probs[i])
        ax1.text(patch_center, ax1.get_ylim()[1], 
                f'{score:.2f}', 
                horizontalalignment='center',
                verticalalignment='bottom',
                rotation=90,
                fontsize=8)
    
    ax1.legend(loc='upper right')
    ax1.grid(True, linestyle='--', alpha=0.3)
    
    # 2. 绘制概率条形图
    patch_positions = np.arange(len(mask_probs))
    bars = ax2.bar(patch_positions, mask_probs.cpu().detach().numpy(), 
                  alpha=0.6,
                  color=['green' if m == 1 else 'red' for m in mask],
                  width=0.8)
    
    ax2.set_title('Mask Probabilities per Patch', fontsize=12, pad=10)
    ax2.set_xlabel('Patch Index', fontsize=10)
    ax2.set_ylabel('Probability', fontsize=10)
    ax2.grid(True, linestyle='--', alpha=0.3)
    
    # 3. 绘制重建信号
    ax3.plot(range(len(pred_signal)), pred_signal, color='#2ca02c', linewidth=1.5, label='Reconstructed')
    ax3.set_title('Reconstructed Signal', fontsize=12, pad=10)
    ax3.set_xlabel('Time', fontsize=10)
    ax3.set_ylabel('Amplitude', fontsize=10)
    ax3.grid(True, linestyle='--', alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 使用示例：
sample_idx = 50  # 选择样本
signal = x[sample_idx, 0].cpu().numpy()  #
mask_probs_noise = mask_probs[sample_idx].detach()  
mask_sample = mask[sample_idx]  # [40]
pred = pred_img[sample_idx, 0].cpu().detach().numpy()  
probs = info_scores[sample_idx].detach()  # [40]
plot_signal_with_masks_and_pred(signal, mask_probs_noise, mask_sample, pred, probs,patch_size=75)

In [None]:
%history