In [None]:
import numpy as np
import nolds

def multiscale_entropy(data, scales=20):
    """
    计算多尺度熵，将长度为 2500 的时间序列转化为 20 个熵值。
    """
    mse_values = []
    for tau in range(1, scales + 1):
        # 生成当前尺度下的降采样序列
        coarse_grained_series = np.array([np.mean(data[i:i + tau]) for i in range(0, len(data) - tau + 1, tau)])
        
        # 计算该尺度的样本熵
        if len(coarse_grained_series) > 2:  # 样本熵要求序列长度足够长
            mse_value = nolds.sampen(coarse_grained_series)
            mse_values.append(mse_value)
        else:
            mse_values.append(np.nan)  # 如果降采样序列太短，记录 NaN
    return mse_values
compressed_data = np.zeros((5643, 43, 20))

# 对每个样本和每个通道计算多尺度熵
for sample in range(5643):
    for channel in range(43):
        # 提取单个通道的时间序列
        time_series = original_data[sample, channel, :]
        # 计算多尺度熵并存储到新的矩阵中
        compressed_data[sample, channel, :] = multiscale_entropy(time_series, scales=20)
        print(f"计算完成第 {sample} 个样本的第 {channel} 个通道")

print("计算完成的多尺度熵数据形状：", compressed_data.shape)


In [None]:
import torch
import numpy as np
import nolds

def multiscale_entropy(data, scales=20):
    """
    计算多尺度熵，将长度为 2500 的时间序列转化为 20 个熵值。
    """
    mse_values = []
    for tau in range(1, scales + 1):
        # 使用 torch 的张量操作进行降采样
        coarse_grained_series = torch.stack([data[i:i + tau].mean() for i in range(0, len(data) - tau + 1, tau)])
        
        # 将结果转换为 numpy 以计算样本熵
        coarse_grained_series = coarse_grained_series.cpu().numpy()  # 确保数据在 CPU 上
        if len(coarse_grained_series) > 2:
            mse_value = nolds.sampen(coarse_grained_series)
            mse_values.append(mse_value)
        else:
            mse_values.append(np.nan)
    return mse_values

# 将数据转换为 torch tensor 并将其移到 GPU 上
original_data = torch.tensor(original_data, dtype=torch.float32).to("cuda")
compressed_data = torch.zeros((5643, 43, 20), device="cuda")

# 对每个样本和每个通道并行计算多尺度熵
for sample in range(5643):
    for channel in range(43):
        # 提取单个通道的时间序列
        time_series = original_data[sample, channel, :]
        # 将时间序列传入多尺度熵函数
        mse_values = multiscale_entropy(time_series, scales=20)
        # 将结果存储在新张量中
        compressed_data[sample, channel, :] = torch.tensor(mse_values, device="cuda")
        print(f"计算完成第 {sample} 个样本的第 {channel} 个通道")

# 将结果转回 CPU 并转换为 numpy 格式
compressed_data = compressed_data.cpu().numpy()
print("计算完成的多尺度熵数据形状：", compressed_data.shape)


In [None]:
total_folds = 10
train_indices, test_indices = dl.Split_Sets(total_folds, EEG_crop)
# Ensure output directories exist
ensure_dir("EEG_Augemnted_Data/TrainData")
ensure_dir("EEG_Augemnted_Data/ValidData")
ensure_dir("EEG_Augemnted_Data/TestData")
kf = KFold(n_splits=10)
seed = 34  # 设定随机种子

for fold, (train_index, test_index) in enumerate(kf.split(SampleEn_EEG)):
    train_data, test_data = SampleEn_EEG[train_index], SampleEn_EEG[test_index]
    train_labels, test_labels = labels[train_index], labels[test_index]
    
    # 进一步划分训练集和验证集
    train_data_split, valid_data_split, train_labels_split, valid_labels_split = train_test_split(
        train_data, train_labels, test_size=0.1, random_state=seed, stratify=train_labels
    )
    # print(train_data_split.shape,train_labels_split.shape,valid_data_split.shape,valid_labels_split.shape)

    # 转换为 PyTorch 张量
    train_data_split = torch.tensor(train_data_split, dtype=torch.float32)
    valid_data_split = torch.tensor(valid_data_split, dtype=torch.float32)
    train_labels_split = torch.tensor(train_labels_split, dtype=torch.long)
    valid_labels_split = torch.tensor(valid_labels_split, dtype=torch.long)

    test_data = torch.tensor(test_data, dtype=torch.float32)
    test_labels = torch.tensor(test_labels, dtype=torch.long)

    # 创建TensorDatasets
    train_dataset = TensorDataset(train_data_split, train_labels_split)
    valid_dataset = TensorDataset(valid_data_split, valid_labels_split)
    test_dataset = TensorDataset(test_data, test_labels)

    # 保存数据和标签
    torch.save(train_dataset, f"EEG_Augemnted_Data/TrainData/train_data_{fold + 1}_fold_with_seed_{seed}.pth")
    torch.save(valid_dataset, f"EEG_Augemnted_Data/ValidData/valid_data_{fold + 1}_fold_with_seed_{seed}.pth")
    torch.save(test_dataset, f"EEG_Augemnted_Data/TestData/test_data_{fold + 1}_fold_with_seed_{seed}.pth.pth")
    logging.info(f"Fold {fold + 1} data saved successfully.")


In [None]:
# 计算多样本熵 (MSE)
def multi_scale_entropy(x, max_scale, m=2, r=0.2):
    mse = []
    for scale in range(1, max_scale + 1):
        # 对信号进行下采样
        x_rescaled = x[::scale]
        # 计算样本熵
        mse.append(nolds.sampen(x_rescaled, m, r*np.std(x_rescaled)))
    return mse


def calculate_mse_features(data,scale=20,m=2, r=0.2):
    mse_all_channels = []
    n_channels = data.shape[0]
    
    # 计算每个通道的 PSD
    for i in range(n_channels):
        mse = multi_scale_entropy(data[i,:],scale,m,r)
        mse_all_channels.append(mse)

    mse_df = pd.DataFrame(mse_all_channels)
    mse_df.columns = [f'scale_{i+1}' for i in range(scale)]
    mse_df['Channel'] = np.arange(1, 20) 
    mse_df = mse_df.set_index('Channel')
