# EMG信号处理完整流程演示

## 教学大纲

### 1. 信号消噪
- 原始信号展示
- 带通滤波（20-500 Hz）
- 陷波滤波（去除50Hz工频干扰）
- 消噪前后对比

### 2. 特征提取（18种经典方法）
**时域特征（10种）：**
- MAV, RMS, VAR, WL, ZC, SSC, IEMG, DASDV, PEAK, MEAN

**频域特征（8种）：**
- MNF, MDF, Peak_Freq, Total_Power, SM1, SM2, SM3, Freq_Ratio

### 3. 不同群体差异分析
- 正常人 vs 轻度障碍 vs 重度障碍
- 统计检验（ANOVA）
- 可视化对比

### 4. 分类器对比
- SVM（支持向量机）
- KNN（K近邻）
- Random Forest（随机森林）

In [None]:
# 导入必要的库
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

from code.week06_preprocessing.filters import EMGFilters
from code.week07_feature_extraction.features import EMGFeatures
from code.week08_pattern_recognition.classifier import EMGClassifier

plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.dpi'] = 100

print("✓ 环境准备完成！")

---
## 第一部分：信号消噪

### 为什么需要消噪？
- 原始EMG信号包含多种噪声
- 50Hz工频干扰
- 低频基线漂移
- 高频随机噪声
- 运动伪影

### 消噪方法
1. **带通滤波器**：保留20-500 Hz（EMG有效频率范围）
2. **陷波滤波器**：去除50/60 Hz工频干扰

In [None]:
# 生成带噪声的EMG信号
fs = 1000
duration = 5
t = np.linspace(0, duration, int(duration * fs))

# 纯净EMG（收缩期在1-4秒）
signal = np.random.normal(0, 0.02, len(t))
start = int(1 * fs)
end = int(4 * fs)

for freq in range(70, 140, 15):
    signal[start:end] += 0.4 * np.sin(2 * np.pi * freq * t[start:end])
signal[start:end] += np.random.normal(0, 0.15, end - start)

# 添加50Hz工频干扰（这是我们要去除的）
signal_noisy = signal + 0.08 * np.sin(2 * np.pi * 50 * t)

print("✓ 信号生成完成")

In [None]:
# 应用滤波器
filters = EMGFilters(fs=fs)

# 完整预处理：带通 + 陷波
signal_clean = filters.preprocess_emg(signal_noisy, remove_powerline=True, powerline_freq=50)

# 可视化对比
fig, axes = plt.subplots(3, 1, figsize=(14, 10))

axes[0].plot(t, signal_noisy, linewidth=0.5, color='red', alpha=0.7)
axes[0].set_title('❶ 原始信号（含噪声）', fontsize=13, fontweight='bold')
axes[0].set_ylabel('幅度 (mV)')
axes[0].grid(True, alpha=0.3)

axes[1].plot(t, signal_clean, linewidth=0.5, color='green')
axes[1].set_title('❷ 滤波后信号（干净）', fontsize=13, fontweight='bold')
axes[1].set_ylabel('幅度 (mV)')
axes[1].grid(True, alpha=0.3)

noise = signal_noisy - signal_clean
axes[2].plot(t, noise, linewidth=0.5, color='orange', alpha=0.7)
axes[2].set_title('❸ 被去除的噪声', fontsize=13, fontweight='bold')
axes[2].set_xlabel('时间 (秒)')
axes[2].set_ylabel('幅度 (mV)')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\n噪声去除效果：")
print(f"  原始信号标准差: {np.std(signal_noisy):.4f} mV")
print(f"  滤波后标准差: {np.std(signal_clean):.4f} mV")
print(f"  噪声改善比: {np.std(signal_noisy)/np.std(signal_clean):.2f}x")

---
## 第二部分：特征提取

### 什么是特征提取？
- 从原始信号中提取**有意义的数值**
- 这些数值能够**区分不同的动作或状态**
- 用于后续的**分类和识别**

### 18种经典特征

**时域特征**（直接从波形计算）：
- **MAV**: 平均绝对值 → 反映肌肉激活强度
- **RMS**: 均方根 → 常用的幅度指标
- **VAR**: 方差 → 反映信号波动
- **WL**: 波形长度 → 反映信号复杂度
- **ZC**: 过零率 → 反映频率特性
- **SSC**: 斜率符号变化 → 反映频率
- **IEMG**: 积分EMG
- **DASDV**: 差分绝对标准差
- **PEAK**: 峰值
- **MEAN**: 均值

**频域特征**（通过傅里叶变换计算）：
- **MNF**: 平均频率
- **MDF**: 中值频率
- **Peak_Freq**: 峰值频率
- **Total_Power**: 总功率
- **SM1-3**: 频谱矩
- **Freq_Ratio**: 频率比

In [None]:
# 提取特征
time_features = EMGFeatures.extract_time_features(signal_clean)
freq_features = EMGFeatures.extract_freq_features(signal_clean, fs=fs)

print("时域特征（10种）：")
print("-" * 40)
for name, value in time_features.items():
    print(f"{name:10s}: {value:10.6f}")

print("\n频域特征（8种）：")
print("-" * 40)
for name, value in freq_features.items():
    print(f"{name:15s}: {value:10.6f}")

In [None]:
# 可视化所有18种特征
all_features = {**time_features, **freq_features}

fig, axes = plt.subplots(3, 6, figsize=(18, 12))
fig.suptitle('18种EMG经典特征', fontsize=16, fontweight='bold')

for idx, (name, value) in enumerate(all_features.items()):
    row = idx // 6
    col = idx % 6
    
    axes[row, col].bar([name], [value], color=f'C{idx % 10}', alpha=0.8)
    axes[row, col].set_title(name, fontsize=11, fontweight='bold')
    axes[row, col].tick_params(axis='x', labelsize=0)  # 隐藏x轴标签
    axes[row, col].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

print(f"\n✓ 共提取 {len(all_features)} 种特征")

---
## 第三部分：不同群体的差异分析

### 研究问题
- 正常人的EMG信号有什么特点？
- 运动功能障碍患者的信号有何不同？
- 哪些特征最能区分不同群体？

### 三个群体
1. **正常人**: 肌肉收缩强有力，频率正常
2. **轻度障碍**: 收缩较弱，频率略低
3. **重度障碍**: 收缩很弱，频率明显降低

In [None]:
# 生成模拟数据函数
def generate_patient_data(patient_type, n_samples=20):
    """生成指定类型患者的EMG数据"""
    fs = 1000
    duration = 5
    filters = EMGFilters(fs=fs)
    
    samples = []
    
    for _ in range(n_samples):
        t = np.linspace(0, duration, int(duration * fs))
        signal = np.random.normal(0, 0.02, len(t))
        start = int(1 * fs)
        end = int(4 * fs)
        
        if patient_type == 'normal':
            # 正常人：强收缩，70-140 Hz
            for freq in range(70, 140, 15):
                signal[start:end] += 0.4 * np.sin(2 * np.pi * freq * t[start:end])
            signal[start:end] += np.random.normal(0, 0.15, end - start)
            
        elif patient_type == 'mild':
            # 轻度障碍：中等收缩，60-120 Hz
            for freq in range(60, 120, 15):
                signal[start:end] += 0.25 * np.sin(2 * np.pi * freq * t[start:end])
            signal[start:end] += np.random.normal(0, 0.12, end - start)
            
        elif patient_type == 'severe':
            # 重度障碍：弱收缩，50-100 Hz
            for freq in range(50, 100, 20):
                signal[start:end] += 0.15 * np.sin(2 * np.pi * freq * t[start:end])
            signal[start:end] += np.random.normal(0, 0.1, end - start)
        
        # 预处理
        signal_clean = filters.preprocess_emg(signal, remove_powerline=True)
        
        # 提取特征
        time_feat = EMGFeatures.extract_time_features(signal_clean)
        freq_feat = EMGFeatures.extract_freq_features(signal_clean, fs=fs)
        
        sample = {**time_feat, **freq_feat}
        samples.append(sample)
    
    return samples

print("开始生成三组数据...")

# 生成数据
data_normal = generate_patient_data('normal', n_samples=20)
data_mild = generate_patient_data('mild', n_samples=20)
data_severe = generate_patient_data('severe', n_samples=20)

# 添加标签
for sample in data_normal:
    sample['group'] = '正常人'
for sample in data_mild:
    sample['group'] = '轻度障碍'
for sample in data_severe:
    sample['group'] = '重度障碍'

# 合并
all_data = data_normal + data_mild + data_severe
df = pd.DataFrame(all_data)

print(f"✓ 数据生成完成：{len(df)}个样本")
print(f"  正常人: {len(data_normal)}个")
print(f"  轻度障碍: {len(data_mild)}个")
print(f"  重度障碍: {len(data_severe)}个")

In [None]:
# 关键特征对比
key_features = ['MAV', 'RMS', 'WL', 'MNF']

fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('不同群体的关键特征对比', fontsize=16, fontweight='bold')

for idx, feat in enumerate(key_features):
    row = idx // 2
    col = idx % 2
    
    df.boxplot(column=feat, by='group', ax=axes[row, col])
    axes[row, col].set_title(feat, fontsize=13, fontweight='bold')
    axes[row, col].set_xlabel('群体', fontsize=11)
    axes[row, col].set_ylabel('数值', fontsize=11)
    axes[row, col].get_figure().suptitle('')

plt.tight_layout()
plt.show()

print("\n观察：")
print("  - MAV/RMS: 正常人最高，重度障碍最低")
print("  - WL: 反映信号复杂度，正常人更高")
print("  - MNF: 平均频率，正常人频率更高")

In [None]:
# 统计检验
print("统计检验（ANOVA）：")
print("=" * 60)

for feat in key_features:
    normal_data = df[df['group'] == '正常人'][feat]
    mild_data = df[df['group'] == '轻度障碍'][feat]
    severe_data = df[df['group'] == '重度障碍'][feat]
    
    f_stat, p_value = stats.f_oneway(normal_data, mild_data, severe_data)
    
    sig = "***" if p_value < 0.001 else "**" if p_value < 0.01 else "*" if p_value < 0.05 else "ns"
    
    print(f"{feat:10s}: F={f_stat:8.3f}, p={p_value:.6f} {sig}")

print("\n*** 非常显著(p<0.001), ** 显著(p<0.01), * 较显著(p<0.05), ns 不显著")

---
## 第四部分：分类器对比

### 任务目标
根据EMG特征，自动判断一个人属于哪个群体

### 三种经典分类器

1. **SVM (Support Vector Machine)**
   - 支持向量机
   - 寻找最优分类边界
   - 适合中小规模数据

2. **KNN (K-Nearest Neighbors)**
   - K近邻算法
   - 根据最近的k个样本投票
   - 简单直观

3. **Random Forest**
   - 随机森林
   - 多个决策树投票
   - 性能稳定，抗过拟合

In [None]:
# 准备数据
feature_cols = [col for col in df.columns if col != 'group']
X = df[feature_cols].values
y = df['group'].values

# 标签映射
label_map = {'正常人': 0, '轻度障碍': 1, '重度障碍': 2}
y_numeric = np.array([label_map[label] for label in y])

print(f"特征矩阵: {X.shape}")
print(f"标签向量: {y_numeric.shape}")
print(f"特征数: {len(feature_cols)}")

In [None]:
# 训练三种分类器
classifiers = [
    ('random_forest', {'n_estimators': 100}, 'Random Forest'),
    ('svm', {'kernel': 'rbf', 'C': 1.0}, 'SVM'),
    ('knn', {'n_neighbors': 5}, 'KNN')
]

results = []

for clf_type, params, name in classifiers:
    print(f"\n训练 {name}...")
    
    clf = EMGClassifier(classifier_type=clf_type, **params)
    X_train, X_test, y_train, y_test = clf.prepare_data(X, y_numeric, test_size=0.3, random_state=42)
    clf.train(X_train, y_train, feature_names=feature_cols, gesture_names=list(label_map.keys()))
    accuracy = clf.evaluate(X_test, y_test)
    
    print(f"  准确率: {accuracy:.2%}")
    
    results.append({
        'classifier': name,
        'accuracy': accuracy * 100
    })

results_df = pd.DataFrame(results)

In [None]:
# 可视化对比
fig, ax = plt.subplots(figsize=(10, 6))

colors = ['#2ecc71', '#3498db', '#e74c3c']
bars = ax.bar(results_df['classifier'], results_df['accuracy'], color=colors, alpha=0.8)

ax.set_ylabel('准确率 (%)', fontsize=12)
ax.set_title('三种分类器性能对比', fontsize=14, fontweight='bold')
ax.set_ylim(0, 100)
ax.grid(True, alpha=0.3, axis='y')

# 标注数值
for bar in bars:
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height,
           f'{height:.1f}%',
           ha='center', va='bottom', fontsize=13, fontweight='bold')

plt.tight_layout()
plt.show()

print("\n分类器排名：")
results_sorted = results_df.sort_values('accuracy', ascending=False)
for idx, row in results_sorted.iterrows():
    print(f"  {row['classifier']:15s}: {row['accuracy']:.2f}%")

---
## 总结

### 完整流程回顾

```
原始EMG信号
    ↓
消噪/滤波（带通 + 陷波）
    ↓
特征提取（18种）
    ↓
统计分析（ANOVA）
    ↓
分类器训练（SVM/KNN/RF）
    ↓
自动识别
```

### 关键发现
1. **消噪很重要**：能显著提高信号质量
2. **特征有差异**：正常人和障碍患者的特征显著不同
3. **分类可行**：机器学习能够自动区分不同群体

### 实际应用
- 康复评估：量化评估患者恢复情况
- 辅助诊断：帮助医生判断功能障碍程度
- 假肢控制：根据EMG控制机械手
- 运动分析：评估运动员肌肉状态