In [4]:
# 安装 pyedflib 库
# 如果尚未安装，请运行以下命令：
# !pip install pyedflib

import pyedflib
import os

# 定义EDF文件路径
# 使用绝对路径以避免工作目录问题
edf_file_path = "SC4001E0-PSG.edf"

# 检查文件是否存在
if not os.path.exists(edf_file_path):
    print(f"错误：文件未找到于 '{edf_file_path}'")
else:
    try:
        # 打开EDF文件
        edf_reader = pyedflib.EdfReader(edf_file_path)

        # 打印文件头信息
        print("文件头信息:")
        print(edf_reader.getHeader())
        print("-" * 20)

        # 获取信号数量
        num_signals = edf_reader.signals_in_file
        print(f"信号数量: {num_signals}")

        # 获取信号标签
        signal_labels = edf_reader.getSignalLabels()
        print(f"信号标签: {signal_labels}")

        # 打印每个信号的头信息
        print("\n各信号头信息:")
        for i in range(num_signals):
            print(f"  信号 {i} ({signal_labels[i]}):")
            print(f"    {edf_reader.getSignalHeader(i)}")
        print("-" * 20)

        # 读取第一个信号的数据
        signal_data = edf_reader.readSignal(0)
        print(f"第一个信号的数据: {signal_data[:10]} (仅显示前10个样本)")

        # 关闭EDF文件
        edf_reader.close()
    except Exception as e:
        print(f"读取EDF文件时出错: {e}")

文件头信息:
{'technician': '', 'recording_additional': '', 'patientname': '', 'patient_additional': '', 'patientcode': '', 'equipment': '', 'admincode': '', 'sex': '', 'startdate': datetime.datetime(1989, 4, 24, 16, 13), 'birthdate': '', 'gender': ''}
--------------------
信号数量: 7
信号标签: ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'Resp oro-nasal', 'EMG submental', 'Temp rectal', 'Event marker']

各信号头信息:
  信号 0 (EEG Fpz-Cz):
    {'label': 'EEG Fpz-Cz', 'dimension': 'uV', 'sample_rate': 100.0, 'sample_frequency': 100.0, 'physical_max': 192.0, 'physical_min': -192.0, 'digital_max': 2047, 'digital_min': -2048, 'prefilter': 'HP:0.5Hz LP:100Hz [enhanced cassette BW]', 'transducer': 'Ag-AgCl electrodes'}
  信号 1 (EEG Pz-Oz):
    {'label': 'EEG Pz-Oz', 'dimension': 'uV', 'sample_rate': 100.0, 'sample_frequency': 100.0, 'physical_max': 196.0, 'physical_min': -197.0, 'digital_max': 2047, 'digital_min': -2048, 'prefilter': 'HP:0.5Hz LP:100Hz [enhanced cassette BW]', 'transducer': 'Ag-AgCl electrodes'}

In [5]:
# 测试不同通道的采样率差异
import pyedflib
import numpy as np

edf_file_path = "SC4001E0-PSG.edf"

if os.path.exists(edf_file_path):
    edf_reader = pyedflib.EdfReader(edf_file_path)
    
    # 我们关心的4个通道
    target_channels = ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'EMG submental']
    
    print("目标通道的采样率信息：")
    signal_labels = edf_reader.getSignalLabels()
    
    for i, label in enumerate(signal_labels):
        if label in target_channels:
            header = edf_reader.getSignalHeader(i)
            signal_data = edf_reader.readSignal(i)
            print(f"通道 {i}: {label}")
            print(f"  采样率: {header['sample_rate']} Hz")
            print(f"  信号长度: {len(signal_data)} 样本")
            print(f"  如果按30秒epochs和100Hz reshape: {len(signal_data)} / 3000 = {len(signal_data) / 3000}")
            print(f"  如果按30秒epochs和实际采样率reshape: {len(signal_data)} / {int(30 * header['sample_rate'])} = {len(signal_data) / (30 * header['sample_rate'])}")
            print()
    
    print("epoch_duration信息：")
    print(f"datarecord_duration: {edf_reader.datarecord_duration}")
    print(f"datarecords_in_file: {edf_reader.datarecords_in_file}")
    
    edf_reader.close()
else:
    print(f"文件不存在: {edf_file_path}")

目标通道的采样率信息：
通道 0: EEG Fpz-Cz
  采样率: 100.0 Hz
  信号长度: 7950000 样本
  如果按30秒epochs和100Hz reshape: 7950000 / 3000 = 2650.0
  如果按30秒epochs和实际采样率reshape: 7950000 / 3000 = 2650.0

通道 1: EEG Pz-Oz
  采样率: 100.0 Hz
  信号长度: 7950000 样本
  如果按30秒epochs和100Hz reshape: 7950000 / 3000 = 2650.0
  如果按30秒epochs和实际采样率reshape: 7950000 / 3000 = 2650.0

通道 2: EOG horizontal
  采样率: 100.0 Hz
  信号长度: 7950000 样本
  如果按30秒epochs和100Hz reshape: 7950000 / 3000 = 2650.0
  如果按30秒epochs和实际采样率reshape: 7950000 / 3000 = 2650.0

通道 4: EMG submental
  采样率: 1.0 Hz
  信号长度: 79500 样本
  如果按30秒epochs和100Hz reshape: 79500 / 3000 = 26.5
  如果按30秒epochs和实际采样率reshape: 79500 / 30 = 2650.0

epoch_duration信息：
datarecord_duration: 30.0
datarecords_in_file: 2650


In [6]:
# 演示重采样解决方案
from scipy import signal as scipy_signal
import numpy as np

def demonstrate_resampling():
    """演示如何将1Hz信号重采样到100Hz"""
    
    # 模拟1Hz信号（30个样本代表30秒）
    original_signal = np.random.randn(30)  # 1Hz, 30秒的数据
    print(f"原始信号 (1Hz): 长度 = {len(original_signal)}")
    
    # 重采样到100Hz（30秒应该有3000个样本）
    target_samples = 30 * 100  # 30秒 * 100Hz = 3000样本
    resampled_signal = scipy_signal.resample(original_signal, target_samples)
    print(f"重采样后信号 (100Hz): 长度 = {len(resampled_signal)}")
    
    # 现在可以reshape成epoch格式
    print(f"可以reshape成: ({len(resampled_signal) // 3000}, 3000)")
    
    return resampled_signal

# 运行演示
try:
    resampled = demonstrate_resampling()
    print("重采样成功！")
except ImportError:
    print("需要安装scipy: pip install scipy")
except Exception as e:
    print(f"演示失败: {e}")

原始信号 (1Hz): 长度 = 30
重采样后信号 (100Hz): 长度 = 3000
可以reshape成: (1, 3000)
重采样成功！


In [7]:
# 检查信号头字典的确切键名
import pyedflib

edf_file_path = "SC4001E0-PSG.edf"

if os.path.exists(edf_file_path):
    edf_reader = pyedflib.EdfReader(edf_file_path)
    
    # 检查第一个信号的头信息
    header = edf_reader.getSignalHeader(0)
    print("信号头的所有键:")
    for key in header.keys():
        print(f"  '{key}': {header[key]}")
    
    print("\n检查是否包含采样率相关的键:")
    possible_keys = ['sample_rate', 'sample_frequency', 'sampling_rate', 'fs', 'freq']
    for key in possible_keys:
        if key in header:
            print(f"  找到键 '{key}': {header[key]}")
    
    edf_reader.close()
else:
    print(f"文件不存在: {edf_file_path}")

信号头的所有键:
  'label': EEG Fpz-Cz
  'dimension': uV
  'sample_rate': 100.0
  'sample_frequency': 100.0
  'physical_max': 192.0
  'physical_min': -192.0
  'digital_max': 2047
  'digital_min': -2048
  'prefilter': HP:0.5Hz LP:100Hz [enhanced cassette BW]
  'transducer': Ag-AgCl electrodes

检查是否包含采样率相关的键:
  找到键 'sample_rate': 100.0
  找到键 'sample_frequency': 100.0


In [8]:
# 测试健壮的采样率检测代码
def get_sampling_rate_robust(signal_header):
    """健壮地获取采样率，尝试多个可能的键名"""
    possible_sr_keys = ['sample_rate', 'sample_frequency', 'sampling_rate', 'fs']
    for key in possible_sr_keys:
        if key in signal_header:
            return signal_header[key]
    return None

# 测试这个函数
import pyedflib

edf_file_path = "SC4001E0-PSG.edf"

if os.path.exists(edf_file_path):
    edf_reader = pyedflib.EdfReader(edf_file_path)
    
    # 测试我们关心的通道
    target_channels = ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'EMG submental']
    signal_labels = edf_reader.getSignalLabels()
    
    print("使用健壮的采样率检测:")
    for i, label in enumerate(signal_labels):
        if label in target_channels:
            header = edf_reader.getSignalHeader(i)
            sr = get_sampling_rate_robust(header)
            print(f"通道 {i} ({label}): 采样率 = {sr} Hz")
    
    edf_reader.close()
else:
    print(f"文件不存在: {edf_file_path}")

使用健壮的采样率检测:
通道 0 (EEG Fpz-Cz): 采样率 = 100.0 Hz
通道 1 (EEG Pz-Oz): 采样率 = 100.0 Hz
通道 2 (EOG horizontal): 采样率 = 100.0 Hz
通道 4 (EMG submental): 采样率 = 1.0 Hz


In [9]:
# 测试 getSampleFrequency 方法作为后备方案
import pyedflib

edf_file_path = "SC4001E0-PSG.edf"

if os.path.exists(edf_file_path):
    edf_reader = pyedflib.EdfReader(edf_file_path)
    
    target_channels = ['EEG Fpz-Cz', 'EEG Pz-Oz', 'EOG horizontal', 'EMG submental']
    signal_labels = edf_reader.getSignalLabels()
    
    print("测试不同的采样率获取方法:")
    for i, label in enumerate(signal_labels):
        if label in target_channels:
            print(f"\n通道 {i} ({label}):")
            
            # 方法1: getSignalHeader
            try:
                header = edf_reader.getSignalHeader(i)
                sr1 = header.get('sample_rate', None)
                print(f"  getSignalHeader['sample_rate']: {sr1}")
            except Exception as e:
                print(f"  getSignalHeader 错误: {e}")
            
            # 方法2: getSampleFrequency
            try:
                sr2 = edf_reader.getSampleFrequency(i)
                print(f"  getSampleFrequency(): {sr2}")
            except Exception as e:
                print(f"  getSampleFrequency 错误: {e}")
    
    edf_reader.close()
else:
    print(f"文件不存在: {edf_file_path}")

测试不同的采样率获取方法:

通道 0 (EEG Fpz-Cz):
  getSignalHeader['sample_rate']: 100.0
  getSampleFrequency(): 100.0

通道 1 (EEG Pz-Oz):
  getSignalHeader['sample_rate']: 100.0
  getSampleFrequency(): 100.0

通道 2 (EOG horizontal):
  getSignalHeader['sample_rate']: 100.0
  getSampleFrequency(): 100.0

通道 4 (EMG submental):
  getSignalHeader['sample_rate']: 1.0
  getSampleFrequency(): 1.0


In [10]:
# 测试切割函数功能
import numpy as np
from scipy.signal.windows import hamming

def split_segments(data, labels, segment_length, target_length, fs):
    """
    Split data and labels into smaller segments, and apply Hamming window to each segment.
    :param data: np.ndarray, shape (num_segments, channels, samples_per_segment)
    :param labels: np.ndarray, shape (num_segments, num_classes)
    :param segment_length: int, original segment length in seconds
    :param target_length: int, target segment length in seconds
    :param fs: int, sampling frequency
    :return: split_data, split_labels
    """
    samples_per_segment = segment_length * fs
    samples_per_target = target_length * fs
    num_targets = samples_per_segment // samples_per_target

    split_data = []
    split_labels = []
    
    # 创建汉明窗
    hamming_window = hamming(samples_per_target)

    for i in range(data.shape[0]):
        for j in range(num_targets):
            start = j * samples_per_target
            end = start + samples_per_target
            segment = data[i, :, start:end]
            
            # 对每个通道应用汉明窗
            for ch in range(segment.shape[0]):
                segment[ch, :] = segment[ch, :] * hamming_window
                
            split_data.append(segment)
            split_labels.append(labels[i])

    return np.array(split_data), np.array(split_labels)

# 测试切割功能
print("测试数据切割功能:")

# 模拟数据: 3个epochs, 4个channels, 每个epoch 3000个样本 (30秒 @ 100Hz)
test_data = np.random.randn(3, 4, 3000)  # (epochs, channels, samples)
test_labels = np.eye(5)[[0, 1, 2]]  # one-hot编码的标签

print(f"原始数据形状: {test_data.shape}")
print(f"原始标签形状: {test_labels.shape}")

# 测试不同的切割场景
test_cases = [
    (30, 10),  # 30秒切割成10秒 (1:3分割)
    (30, 15),  # 30秒切割成15秒 (1:2分割)
    (30, 5),   # 30秒切割成5秒 (1:6分割)
]

for segment_length, target_length in test_cases:
    print(f"\n切割测试: {segment_length}秒 -> {target_length}秒")
    
    split_data, split_labels = split_segments(test_data, test_labels, segment_length, target_length, 100)
    
    print(f"切割后数据形状: {split_data.shape}")
    print(f"切割后标签形状: {split_labels.shape}")
    print(f"预期切割比例: 1:{segment_length//target_length}")
    print(f"实际切割比例: 1:{split_data.shape[0]//test_data.shape[0]}")

测试数据切割功能:
原始数据形状: (3, 4, 3000)
原始标签形状: (3, 5)

切割测试: 30秒 -> 10秒
切割后数据形状: (9, 4, 1000)
切割后标签形状: (9, 5)
预期切割比例: 1:3
实际切割比例: 1:3

切割测试: 30秒 -> 15秒
切割后数据形状: (6, 4, 1500)
切割后标签形状: (6, 5)
预期切割比例: 1:2
实际切割比例: 1:2

切割测试: 30秒 -> 5秒
切割后数据形状: (18, 4, 500)
切割后标签形状: (18, 5)
预期切割比例: 1:6
实际切割比例: 1:6


# Enhanced Sleep-EDF Processing Script Usage Examples

修改后的 `prepare_sleep-edf-2013.py` 脚本现在支持数据切割功能，可以将30秒的原始数据切割成不同长度的片段。

## 使用方法

### 基本用法（不切割）
```bash
python prepare_sleep-edf-2013.py --data_dir ./edf --output_dir ./npz
```

### 切割成较短片段
```bash
# 切割成10秒片段
python prepare_sleep-edf-2013.py --data_dir ./edf --output_dir ./npz --segment_length 30 --target_length 10

# 切割成5秒片段  
python prepare_sleep-edf-2013.py --data_dir ./edf --output_dir ./npz --segment_length 30 --target_length 5

# 切割成15秒片段
python prepare_sleep-edf-2013.py --data_dir ./edf --output_dir ./npz --segment_length 30 --target_length 15
```

## 新增功能

1. **数据切割**: 可以将30秒的epochs切割成更短的片段
2. **汉明窗**: 自动为每个切割后的片段应用汉明窗
3. **智能文件命名**: 输出文件名包含切割信息
4. **保存切割参数**: 在.npz文件中保存segment_length和target_length参数

## 输出格式

- 不切割: `sleep_edf_processed_30s.npz`
- 切割: `sleep_edf_processed_30s_to_10s.npz`

## 数据格式

保存的.npz文件包含：
- `Fold_data`: 所有被试的信号数据
- `Fold_label`: 所有被试的标签（one-hot编码）
- `Fold_len`: 每个被试的数据长度
- `channels`: 通道名称列表
- `sampling_rate`: 采样率
- `segment_length`: 原始片段长度
- `target_length`: 目标片段长度