In [1]:
import scipy.io as sio
import numpy as np

In [5]:
import scipy.io
import pandas as pd
import os
import re

def process_single_data_dir(data_dir):
    """
    处理单个数据目录（如12kHz_FE_data），返回该目录下B、IR、OR三种故障类型的DataFrame
    
    参数:
        data_dir: 数据目录路径（如D:\360极速浏览器下载\data\original\12kHz_FE_data）
    
    返回:
        包含三个DataFrame的字典，键为'B', 'IR', 'OR'
    """
    # 初始化存储结果的字典
    result_dfs = {
        'B': pd.DataFrame(),
        'IR': pd.DataFrame(),
        'OR': pd.DataFrame()
    }
    
    # 点位映射（仅OR类型需要）
    position_mapping = {
        "Orthogonal": 3,       # 3点钟
        "Centered": 6,         # 6点钟
        "Opposite": 12         # 12点钟
    }
    
    # 遍历数据目录下的三种故障类型文件夹（B、IR、OR）
    for fault_type in ['B', 'IR', 'OR']:
        fault_dir = os.path.join(data_dir, fault_type)
        
        # 检查文件夹是否存在
        if not os.path.exists(fault_dir) or not os.path.isdir(fault_dir):
            print(f"警告：{fault_type}故障类型文件夹不存在或不是目录 - {fault_dir}")
            continue
        
        all_data = []
        print(f"\n开始处理 {data_dir} 下的 {fault_type} 类型数据...")
        
        # 处理OR类型（有特殊的点位文件夹结构）
        if fault_type == 'OR':
            # 遍历OR目录下的点位文件夹
            for position_folder in os.listdir(fault_dir):
                position_path = os.path.join(fault_dir, position_folder)
                if not os.path.isdir(position_path):
                    continue  # 只处理文件夹
                
                # 获取点位对应的数字
                position_num = position_mapping.get(position_folder, position_folder)
                print(f"处理点位: {position_folder} ({position_num})")
                
                # 遍历点位文件夹下的尺寸文件夹
                for size_folder in os.listdir(position_path):
                    size_path = os.path.join(position_path, size_folder)
                    if not os.path.isdir(size_path):
                        continue  # 只处理文件夹
                    
                    # 遍历尺寸文件夹下的MAT文件
                    for file_name in os.listdir(size_path):
                        if not (file_name.startswith('OR') and file_name.endswith('.mat')):
                            continue  # 只处理符合命名规则的MAT文件
                        
                        file_path = os.path.join(size_path, file_name)
                        try:
                            # 读取MAT文件
                            mat_data = scipy.io.loadmat(file_path)
                            
                            # 查找X开头的DE_time变量
                            pattern = r'X\d+_DE_time'
                            de_time_vars = [var for var in mat_data.keys() if re.match(pattern, var)]
                            
                            if not de_time_vars:
                                print(f"文件 {file_name} 中未找到符合模式的DE_time变量，跳过")
                                continue
                                
                            # 取第一个匹配的变量作为基准
                            de_time_var = de_time_vars[0]
                            id_prefix = de_time_var.split('_')[0]
                            
                            # 构建其他变量名
                            fe_time_var = f"{id_prefix}_FE_time"
                            rpm_var = f"{id_prefix}RPM"
                            
                            # 验证必要变量是否存在
                            required_vars = [de_time_var, fe_time_var, rpm_var]
                            missing_vars = [var for var in required_vars if var not in mat_data]
                            
                            #if missing_vars:
                                #print(f"文件 {file_name} 缺少变量: {', '.join(missing_vars)}，跳过")
                                #continue
                            
                            # 提取变量数据
                            de_time = mat_data[de_time_var].flatten()
                            fe_time = mat_data[fe_time_var].flatten()
                            rpm = mat_data[rpm_var][0, 0]
                            
                            # 从文件名提取信息
                            file_pattern = r'OR(\d+)@(\d+)_(\d+)\.mat'
                            match = re.match(file_pattern, file_name)
                            
                            if not match:
                                print(f"文件 {file_name} 命名不符合规则，跳过")
                                continue
                            
                            fault_size = int(match.group(1))
                            load = int(match.group(3))
                            
                            # 创建DataFrame
                            df = pd.DataFrame({
                                'DE_time': de_time,
                                'FE_time': fe_time,
                                'RPM': rpm,
                                'fault_type': fault_type,
                                'fault_size': fault_size,
                                'position': position_num,
                                'horsepower': load,
                                 'idname':id_prefix
                                
                            })
                            
                            all_data.append(df)
                            print(f"成功处理: {file_path}")
                            
                        except Exception as e:
                            print(f"处理文件 {file_path} 时出错: {str(e)}")
        
        # 处理B和IR类型
        else:
            # 遍历故障类型目录下的尺寸文件夹
            for folder_name in os.listdir(fault_dir):
                folder_path = os.path.join(fault_dir, folder_name)
                if not os.path.isdir(folder_path):
                    continue  # 只处理文件夹
                
                # 遍历每个文件夹下的MAT文件
                for file_name in os.listdir(folder_path):
                    if not (file_name.startswith(fault_type) and file_name.endswith('.mat')):
                        continue  # 只处理符合命名规则的MAT文件
                    
                    file_path = os.path.join(folder_path, file_name)
                    try:
                        # 读取MAT文件
                        mat_data = scipy.io.loadmat(file_path)
                        
                        # 判断是否为028文件夹
                        if folder_name == '0028':
                            # 动态识别DE相关变量
                            de_vars = [var for var in mat_data.keys() 
                                      if 'DE' in var and not var.startswith('__')]
                            
                            if not de_vars:
                                de_vars = [var for var in mat_data.keys() 
                                          if 'de' in var and not var.startswith('__')]
                            
                            if not de_vars:
                                print(f"028文件夹中文件 {file_name} 未找到包含DE的变量，跳过")
                                continue
                            
                            # 选择第一个找到的DE相关变量
                            de_var = de_vars[0]
                            id_prefix=de_var.split('_')[0]
                            de_data = mat_data[de_var].flatten()
                            
                            # 从文件名提取马力和转速
                            horsepower_match = re.search(r'_(\d+)_', file_name)
                            rpm_match = re.search(r'\((\d+)rpm\)', file_name)
                            
                            if not (horsepower_match and rpm_match):
                                print(f"028文件夹中文件 {file_name} 命名不符合规则，跳过")
                                continue
                            
                            horsepower = int(horsepower_match.group(1))
                            rpm = int(rpm_match.group(1))
                            
                            # 创建DataFrame
                            df = pd.DataFrame({
                                'DE_time': de_data,
                                'fault_type': fault_type,
                                'fault_size': int(folder_name),
                                'horsepower': horsepower,
                                'RPM': rpm,
                                'idname':id_prefix
                            })
                            
                        else:
                            # 其他文件夹处理逻辑
                            pattern = r'X\d+_DE_time'
                            de_time_vars = [var for var in mat_data.keys() if re.match(pattern, var)]
                            
                            if not de_time_vars:
                                print(f"文件 {file_path} 中未找到符合模式的变量，跳过处理")
                                continue
                                
                            # 取第一个匹配的变量作为基准
                            de_time_var = de_time_vars[0]
                            id_prefix = de_time_var.split('_')[0]
                            
                            # 构建其他变量名
                            fe_time_var = f"{id_prefix}_FE_time"
                            ba_time_var = f"{id_prefix}_BA_time"
                            rpm_var = f"{id_prefix}RPM"
                            
                            # 验证所有必要变量是否存在
                            r_vars = [de_time_var, fe_time_var, ba_time_var, rpm_var]
                            required_vars=[var for var in r_vars if var  in mat_data]
                            missing_vars = [var for var in r_vars if var not in mat_data]
                            
                            #if missing_vars:
                                #print(f"文件 {file_path} 缺少变量: {', '.join(missing_vars)}，跳过处理")
                                #continue
                            
                            # 提取数据
                            for i in required_vars:
                                if  i==de_time_var:
                                    de_time = mat_data[de_time_var].flatten()
                                elif i==fe_time_var:
                                    fe_time = mat_data[fe_time_var].flatten()   
                                elif i==ba_time_var:
                                    ba_time = mat_data[ba_time_var].flatten()
                                elif i==rpm_var:
                                    rpm = mat_data[rpm_var][0, 0]
                                
                            
                            # 从文件名提取马力信息
                            horsepower = int(file_name.split('_')[1].split('.')[0])
                            
                            # 创建DataFrame
                            if ba_time_var not in required_vars:
                                   
                                df = pd.DataFrame({
                                    'DE_time': de_time,
                                    'FE_time': fe_time,
                                    'RPM': rpm,
                                    'fault_type': fault_type,
                                    'fault_size': int(folder_name),
                                    'horsepower': horsepower,
                                    'idname':id_prefix
                                })
                            else:
                                df = pd.DataFrame({
                                    'DE_time': de_time,
                                    'FE_time': fe_time,
                                    'BA_time': ba_time,
                                    'RPM': rpm,
                                    'fault_type': fault_type,
                                    'fault_size': int(folder_name),
                                    'horsepower': horsepower,
                                    'idname':id_prefix
                                })

                        
                        all_data.append(df)
                        print(f"成功处理: {file_path}")
                        
                    except Exception as e:
                        print(f"处理文件 {file_path} 时出错: {str(e)}")
        
        # 合并当前故障类型的所有数据
        if all_data:
            combined_df = pd.concat(all_data, ignore_index=True)
            result_dfs[fault_type] = combined_df
            
            # 保存为CSV文件
            output_path = os.path.join(fault_dir, f'combined_{fault_type}_data.csv')
            combined_df.to_csv(output_path, index=False)
            print(f"\n{fault_type}类型数据处理完成！")
            print(f"合并后的数据形状: {combined_df.shape}")
            print(f"数据已保存至: {output_path}")
        else:
            print(f"\n没有成功处理任何{fault_type}类型数据文件")
    
    return result_dfs

# 使用示例
    


In [6]:
# 定义要处理的多个数据目录及其对应的标识（如12FE对应12kHz_FE_data）
data_dirs = [
    {
        "path": r"D:\360极速浏览器下载\data\original\12kHz_FE_data",
        "tag": "12fe"  # 数据类型标识（小写，便于变量名）
    },
    {
        "path": r"D:\360极速浏览器下载\data\original\48kHz_DE_data",
        "tag": "48de"
    },
    {
        "path": r"D:\360极速浏览器下载\data\original\12kHz_DE_data",
        "tag": "12de"
    }
]

# 存储所有生成的DataFrame（键为变量名，值为DataFrame）
all_dfs = {}

# 循环处理每个数据目录
for dir_info in data_dirs:
    data_path = dir_info["path"]
    data_tag = dir_info["tag"]
    
    # 检查目录是否存在
    if not os.path.exists(data_path) or not os.path.isdir(data_path):
        print(f"跳过不存在的目录: {data_path}")
        continue
    
    # 调用处理函数获取该目录下的三个故障类型DataFrame
    fault_dfs = process_single_data_dir(data_path)
    
    # 为每个故障类型生成带标识的变量名（如df_12fe_b）
    for fault_type in ['B', 'IR', 'OR']:
        var_name = f"df_{data_tag}_{fault_type.lower()}"  # 统一小写，如df_12fe_b
        all_dfs[var_name] = fault_dfs[fault_type]
        
        # 同时生成全局变量（方便直接调用）
        globals()[var_name] = fault_dfs[fault_type]
        
        # 打印生成信息
        print(f"生成变量: {var_name}，数据形状: {fault_dfs[fault_type].shape}")

# ----------------------
# 使用示例
# ----------------------
# 直接使用生成的变量（如12FE的B类型数据）
print("\n12kHz_FE_data的B类型数据前5行:")
print(df_12fe_b.head())

# 查看48kHz_DE_data的IR类型数据形状
print(f"\n48kHz_DE_data的IR类型数据形状: {df_48de_ir.shape}")

# 合并同一故障类型的不同数据（如合并所有B类型数据）
all_b_dfs = [df for name, df in all_dfs.items() if name.endswith('_b')]
combined_b = pd.concat(all_b_dfs, ignore_index=True)
print(f"\n合并所有B类型数据后的形状: {combined_b.shape}")


开始处理 D:\360极速浏览器下载\data\original\12kHz_FE_data 下的 B 类型数据...
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0007\B007_0.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0007\B007_1.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0007\B007_2.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0007\B007_3.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0014\B014_0.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0014\B014_1.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0014\B014_2.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0014\B014_3.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0021\B021_0.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0021\B021_1.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0021\B021_2.mat
成功处理: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\0021\B021_3.mat

B类型数据处理完成！
合并后的数据形状: (1152000, 8)
数据已保存至: D:\360极速浏览器下载\data\original\12kHz_FE_data\B\combined_B_data.csv

开始处理 D:\360极速浏览器下载\data\original\12kHz_

In [10]:
df_12fe_b['idname'].value_counts()

idname
X282    96000
X283    96000
X284    96000
X285    96000
X286    96000
X287    96000
X288    96000
X289    96000
X290    96000
X291    96000
X292    96000
X293    96000
Name: count, dtype: int64

In [None]:
import numpy as np
import pandas as pd
from scipy.signal import medfilt, butter, filtfilt, hilbert
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.utils import shuffle
from scipy.stats import kurtosis
from scipy.ndimage import median_filter
# ----------------------
# 1. 智能预处理核心函数（脉冲检测+选择性滤波+故障频率带通）
# ----------------------
def add_sequences(de_seq, fe_seq,ba_seq):
                # 取较短序列的长度进行截断，确保对齐
     if de_seq and fe_seq and ba_seq:
         min_len =min(min(len(de_seq), len(fe_seq)), len(ba_seq))
         return  (de_seq[:min_len] + fe_seq[:min_len]+ ba_seq[:min_len] )/3
     elif de_seq and fe_seq:
            min_len =min(len(de_seq), len(fe_seq))
            return  (de_seq[:min_len] + fe_seq[:min_len])/2     # 按元素相加
            
def moving_average(signal, window_size=5):
    return np.convolve(signal, np.ones(window_size)/window_size, mode='same')         
def weighted_moving_average(signal, window_size=5):
    # 生成高斯权重（中心权重高，边缘低）
    weights = np.exp(-0.5 * ((np.arange(window_size) - window_size//2) / (window_size/4))**2)
    weights /= np.sum(weights)  # 归一化
    return np.convolve(signal, weights, mode='same')
def median_filter(signal, size=5):
    return median_filter(signal, size=5)   
def wavelet_filter(signal, wavelet='db4', level=4):
    # 小波去噪
    coeffs = pywt.wavedec(signal, wavelet, level=level)
    threshold = 0.5 * np.median(np.abs(coeffs[-1]))  # 自适应阈值
    coeffs[1:] = [pywt.threshold(c, threshold, 'soft') for c in coeffs[1:]]
    return pywt.waverec(coeffs, wavelet)
def butter_bandpass_filter(signal, fs, lowcut, highcut, order=4):
    # 设计带通滤波器
    nyq = 0.5 * fs  # 奈奎斯特频率
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')  # 带通滤波器系数
    return filtfilt(b, a, signal) 
def calculate_ffer(signal, fs, fault_freq, harmonic_order=3):
    """计算故障频率能量比(FFER)：故障频率及其3次谐波的能量占总能量比例"""
    # 傅里叶变换（取单边谱）
    fft_vals = np.fft.fft(signal)
    fft_mag = np.abs(fft_vals)[:len(signal)//2]  # 单边幅度谱
    freq_axis = np.fft.fftfreq(len(signal), 1/fs)[:len(signal)//2]  # 频率轴

    # 总能量
    total_energy = np.sum(fft_mag ** 2)
    if total_energy < 1e-12:
        return 0.0

    # 故障频率及谐波的能量总和
    fault_energy = 0.0
    for n in range(1, harmonic_order + 1):
        target_freq = fault_freq * n
        # 找到目标频率附近的索引（±1Hz范围，避免频谱泄漏影响）
        freq_idx = np.where(np.abs(freq_axis - target_freq) <= 1.0)[0]
        if len(freq_idx) > 0:
            fault_energy += np.sum(fft_mag[freq_idx] ** 2)
    
    return fault_energy / total_energy
def calculate_sample_entropy(signal, m=2, r=0.2):
    """计算样本熵（衡量信号复杂度，故障信号复杂度更高）
    - m: 嵌入维度，默认2
    - r: 相似容差，默认0.2*信号标准差
    """
    signal = np.array(signal)
    n = len(signal)
    if n < m + 1:
        return 0.0  # 数据长度不足时返回0
    
    # 计算相似容差（默认0.2*标准差）
    r = r * np.std(signal) if r < 1 else r
    
    # 构建m维和m+1维嵌入矩阵
    def _embed(x, dim):
        return np.array([x[i:i+dim] for i in range(len(x) - dim + 1)])
    
    mat_m = _embed(signal, m)
    mat_m1 = _embed(signal, m + 1)
    
    # 计算相似向量数
    def _count_similar(mat, r_val):
        count = 0
        for i in range(len(mat)):
            # 计算与其他向量的最大距离
            dist = np.max(np.abs(mat - mat[i]), axis=1)
            # 统计小于等于r的向量数（排除自身）
            count += np.sum(dist <= r_val) - 1
        return count / (len(mat) - 1)  # 归一化
    
    c_m = _count_similar(mat_m, r)
    c_m1 = _count_similar(mat_m1, r)
    
    # 避免log(0)
    if c_m == 0 or c_m1 == 0:
        return 0.0
    return -np.log(c_m1 / c_m)
def calculate_permutation_entropy(signal, m=3, tau=1):
    """计算排列熵（衡量信号时序模式复杂度）
    - m: 嵌入维度，默认3
    - tau: 延迟时间，默认1
    """
    signal = np.array(signal)
    n = len(signal)
    if n < m * tau:
        return 0.0
    
    # 构建嵌入矩阵
    mat = np.array([signal[i:i+m*tau:tau] for i in range(n - m*tau + 1)])
    # 对每行排序并记录索引（排列模式）
    permutations = np.argsort(mat, axis=1)
    # 统计每种排列模式的出现频率
    unique_perms, counts = np.unique(permutations, axis=0, return_counts=True)
    probs = counts / len(permutations)
    
    # 计算排列熵（避免log(0)）
    return -np.sum(probs * np.log2(probs + 1e-12))
def calculate_harmonic_energy_ratio(signal, fs, fault_freq, max_harmonic=5):
    """计算物理域特征：故障频率倍数能量比（检测谐波/边带）"""
    # 傅里叶变换
    fft_mag = np.abs(np.fft.fft(signal))[:len(signal)//2]
    freq_axis = np.fft.fftfreq(len(signal), 1/fs)[:len(signal)//2]
    total_energy = np.sum(fft_mag ** 2)
    if total_energy < 1e-12:
        return np.zeros(max_harmonic)  # 能量过小时返回全0
    
    # 计算各次谐波的能量比
    harmonic_ratios = []
    for n in range(1, max_harmonic + 1):
        target_freq = fault_freq * n
        # 捕捉目标频率±0.5Hz范围（提高准确性）
        freq_idx = np.where(np.abs(freq_axis - target_freq) <= 0.5)[0]
        if len(freq_idx) == 0:
            harmonic_ratios.append(0.0)
        else:
            harmonic_energy = np.sum(fft_mag[freq_idx] ** 2)
            harmonic_ratios.append(harmonic_energy / total_energy)
    
    return harmonic_ratios
def calculate_envelope_features(signal, fs):
    """计算包络域特征：包络谱峰值 + 包络峭度"""
    # 1. 希尔伯特变换求解析信号（包络）
    analytic_signal = hilbert(signal)
    envelope = np.abs(analytic_signal)
    
    # 2. 包络峭度（去均值避免直流分量影响）
    envelope_kurt = kurtosis(envelope - np.mean(envelope), fisher=False)  # 
    
    # 3. 包络谱峰值（对包络做FFT找峰值）
    envelope_fft = np.abs(np.fft.fft(envelope))[:len(envelope)//2]
    envelope_freq = np.fft.fftfreq(len(envelope), 1/fs)[:len(envelope)//2]
    # 排除0Hz直流分量
    valid_idx = envelope_freq > 1.0  # 忽略1Hz以下低频干扰
    if np.sum(valid_idx) == 0:
        envelope_peak = 0.0
    else:
        envelope_peak = np.max(envelope_fft[valid_idx])
    
    return envelope_peak, envelope_kurt
def smart_preprocess(signal, fs, rpm, fault_type, bearing_type,threshold=3.0, kernel_size=5):
    """对单条振动信号执行智能预处理，输入为numpy数组"""
    sig = signal.copy()
    
    # 步骤1：检测大幅值脉冲（故障冲击特征）
    if detect_pulses(sig, threshold):
        # 步骤2：轻度中值滤波（保留脉冲，滤除高频噪声）
        sig = light_median_filter(sig, kernel_size)
    
    # 步骤3：故障频率带通滤波（聚焦目标故障频率附近信号）
    center_freq = calculate_fault_frequency(rpm, fault_type, bearing_type)
    nyq = 0.5 * fs  # 奈奎斯特频率，避免频率超界
    lowcut = max(10, center_freq - 200)  # 滤波下限（不低于10Hz）
    highcut = min(nyq - 10, center_freq + 200)  # 滤波上限（不超过奈奎斯特-10Hz）
    sig = bandpass_filter(sig, fs, lowcut, highcut)
    
    return sig

# ----------------------
# 2. 辅助函数（工具类）
# ----------------------
def detect_pulses(signal, threshold=3.0):
    """脉冲检测：超过均值±threshold*标准差的视为脉冲"""
    mean_val = np.mean(signal)
    std_val = np.std(signal)
    return np.any(np.abs(signal - mean_val) > threshold * std_val)

def light_median_filter(signal, kernel_size=5):
    """轻度中值滤波，kernel_size建议为奇数（5/7）"""
    return medfilt(signal, kernel_size=kernel_size)


def calculate_fault_frequency(rpm, fault_type, bearing_type):
    """
    适配不同轴承类型的故障特征频率计算
    bearing_type: 'DE'（对应SKF6205）或 'FE'（对应SKF6203）
    """
    # 轴承参数：滚动体数N_d, 滚动体直径d, 轴承节径D（单位：英寸）
    if bearing_type == 'DE':
        N_d, d, D = 9, 0.3126, 1.537
    else:  # FE
        N_d, d, D = 9, 0.2656, 1.122
    
    f_r = rpm / 60  # 轴承转频（Hz）
    
    if fault_type == 'OR':  # 外圈故障（BPFO）
        return (N_d / 2) * f_r * (1 - d/D)
    elif fault_type == 'IR':  # 内圈故障（BPFI）
        return (N_d / 2) * f_r * (1 + d/D)
    elif fault_type == 'B':  # 滚动体故障（BSF）
        return (D / d) * f_r * (1 - (d/D)**2)
    else:
        return 1000  # 未知故障默认频率

def bandpass_filter(signal, fs, lowcut, highcut, order=4):
    """零相位带通滤波（避免信号相位偏移）"""
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')  # Butterworth滤波器
    return filtfilt(b, a, signal)  # filtfilt实现零相位

def extract_time_features(signal):
    """提取时域特征（故障识别核心特征）"""
    signal_abs = np.abs(signal)
    return pd.Series({
        "mean": np.mean(signal),
        "var": np.var(signal),
        "std": np.std(signal),
        "kurtosis": pd.Series(signal).kurtosis(),  # 峭度（对冲击敏感）
        "peak": np.max(signal_abs),  # 峰值
        "rms": np.sqrt(np.mean(signal**2)),  # 均方根（能量指标）
        "peak_factor": np.max(signal_abs) / np.sqrt(np.mean(signal**2)),  # 峰值因子
        "Margin_Factor":np.max(signal) / (np.sqrt(np.mean(signal)) ** 2 + 1e-8)
    })
def butter_lowpass_filter(data, cutoff, fs, order=4):
    """低频滤波器，用于提取BA信号中的低频成分"""
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y
def extract_envelope_features(signal, fs):
    """提取包络域特征"""
    envelope_peak, envelope_kurt = calculate_envelope_features(signal, fs)
    return pd.Series({
        "envelope_peak": envelope_peak,
        "envelope_kurtosis": envelope_kurt
    })
def extract_sample_entropy(signal):
    """提取样本熵特征"""
    sampen = calculate_sample_entropy(signal)
    perpen=calculate_permutation_entropy(signal) 
    return pd.Series({
        "sample_entropy": sampen,
        "permutation_entropy": perpen
    })

def extract_physical_features(signal, fs, rpm, fault_type, bearing_type):
    """提取物理特征"""
    features={}
    fault_freq = calculate_fault_frequency(rpm, fault_type, bearing_type)
    harmonic_ratios=calculate_harmonic_energy_ratio(signal, fs, fault_freq)
    for n, ratio in enumerate(harmonic_ratios, 1):
        features[f"ffer_harmonic_{n}"] = ratio
    return pd.Series(features)
    
def extract_frequency_features(signal, fs, rpm):
    """提取频域特征，特别关注BA信号的低频特征"""
    n = len(signal)
    freq = np.fft.fftfreq(n, 1/fs)[:n//2]
    fft_values = np.abs(np.fft.fft(signal))[:n//2]
    
    # 计算旋转频率 (1x RPM)
    rotation_freq = rpm / 60
    
    # 提取关键频率成分的幅值
    features = {
        'fft_max': np.max(fft_values),
        'fft_mean': np.mean(fft_values),
        'rotation_freq_amp': np.max(fft_values[np.where((freq >= rotation_freq*0.9) & (freq <= rotation_freq*1.1))]),
        'double_rotation_amp': np.max(fft_values[np.where((freq >= rotation_freq*1.9) & (freq <= rotation_freq*2.1))]),
        'triple_rotation_amp': np.max(fft_values[np.where((freq >= rotation_freq*2.9) & (freq <= rotation_freq*3.1))]),
        'low_freq_energy': np.sum(fft_values[np.where(freq <= 100)]) / np.sum(fft_values) if np.sum(fft_values) > 0 else 0,  # <100Hz 能量占比
        'mid_low_freq_energy': np.sum(fft_values[np.where((freq > 100) & (freq <= 500))]) / np.sum(fft_values) if np.sum(fft_values) > 0 else 0  # 100-500Hz 能量占比
    }
    return pd.Series(features)
# ----------------------
# 3. 核心：按idname分组+智能预处理（适配动态信号列）
# ----------------------
def preprocess_df(df, data_tag, fs):
    """
    对单个DataFrame（如df_12fe_b）执行完整预处理：
    - 自动检测存在的信号列（DE_time/FE_time/BA_time）
    - 按idname分组处理（保证同类有序数据一致性）
    - 只处理存在的信号列，不存在的跳过
    """
    # 步骤1：检测当前DataFrame中存在的信号列
    signal_cols = []
    if "DE_time" in df.columns:
        signal_cols.append("DE_time")
    if "FE_time" in df.columns:
        signal_cols.append("FE_time")
    if "BA_time" in df.columns:
        signal_cols.append("BA_time")
    
    if not signal_cols:
        print(f"警告：{data_tag} 无任何信号列，返回空DataFrame")
        return pd.DataFrame()
    
    # 步骤2：按idname分组处理（同一idname为一类有序数据）
    grouped_results = []
    id_scaler_dict = {}  # 存储每个idname的标准化器（保证同组参数一致）
    
    for idname, group_df in df.groupby("idname"):
        print(f"处理 {data_tag} - idname: {idname}（数据量: {group_df.shape[0]}）")
        
        # 子步骤1：缺失值处理（关键列不可缺）
        drop_na_cols = signal_cols + ["RPM", "fault_type", "idname"]
        group_df = group_df.dropna(subset=drop_na_cols)
        if group_df.empty:
            print(f"idname {idname} 无有效数据，跳过")
            continue
        
        # 子步骤2：对存在的信号列逐个执行智能预处理
        processed_signal_cols = []
        for col in signal_cols:
            proc_col = f"{col}_processed"  # 处理后列名（如DE_time→DE_time_processed）
            processed_signal_cols.append(proc_col)
            
            # 逐行处理（每行RPM/fault_type可能不同，需动态适配）
            group_df[proc_col] = group_df.apply(
                    lambda row: smart_preprocess(
                        signal=row[col],  # 原始信号（已为numpy数组）
                        fs=fs,            # 采样率（根据数据类型确定）
                        rpm=row["RPM"],   # 当前样本的转速
                        fault_type=row["fault_type"],
                        bearing_type=col.split("_")[0]  # 当前样本的故障类型
                    ), axis=1
                )
            if col=="BA_time":
                # BA信号仅做低频滤波处理# BA信号预处理：滤波降噪和特征增强
                group_df[proc_col+"_lowfiltered"] = group_df.apply(
                    lambda row: butter_lowpass_filter(
                        data=row[col],
                        cutoff=100,  # 截止频率1kHz
                        fs=fs,
                        order=4
                    ), axis=1
                )
                processed_signal_cols.append(proc_col+"_lowfiltered")
            group_df[]=
        if len(processed_signal_cols)==2:
           
            group_df["FUSED_time_added"] = group_df.apply(
                lambda row: add_sequences(group_df[processed_signal_cols[0]], group_df[processed_signal_cols[1]]), axis=1
            )
            processed_signal_cols.append("FUSED_time_added_21")  # 后续会删除处理后的序列列
            
        if len(processed_signal_cols)==3:
            group_df["FUSED_time_added"] = group_df.apply(
                lambda row: add_sequences(group_df[processed_signal_cols[0]], group_df[processed_signal_cols[1]],group_df[processed_signal_cols[2]]), axis=1
            )
            processed_signal_cols.append("FUSED_time_added_3")  
            group_df["FUSED_time_added"] = group_df.apply(
                lambda row: add_sequences(group_df[processed_signal_cols[0]], group_df[processed_signal_cols[1]]), axis=1
            )
            processed_signal_cols.append("FUSED_time_added_21") 
            group_df["FUSED_time_added"] = group_df.apply(
                lambda row: add_sequences(group_df[processed_signal_cols[0]], group_df[processed_signal_cols[2]]), axis=1
            )
            processed_signal_cols.append("FUSED_time_added_22") 
            
        
        # 子步骤3：不删除原始信号列，保留以备后续分析
        #group_df = group_df.drop(columns=signal_cols)
        
        # 子步骤4：提取时域特征（为特征加idname前缀，避免后续合并冲突）
        feature_dfs = []
        for proc_col in processed_signal_cols:
            sig_type = proc_col.split("_")[0]
            lowfil=proc_col.split("_")[-1]  # 提取信号类型（DE/FE/BA）
            
                # BA信号同时提取频域特征                 # 提取BA频域特征（特别关注低频和旋转频率相关特征）
            time_features = group_df[proc_col].apply(extract_time_features)
            freq_features = group_df.apply(
                    lambda row: extract_frequency_features(
                        signal=row[proc_col],
                        fs=fs,
                        rpm=row["RPM"]
                    ), axis=1
                )
            physical_features=group_df[proc_col].apply(
                    lambda row: extract_physical_features(
                        signal=row[proc_col],
                        fs=fs,
                        rpm=row["RPM"],
                        fault_type=row["fault_type"],
                        bearing_type=sig_type
                    ),axis=1
                )
            sample_entropy=group_df[proc_col].apply(extract_sample_entropy)
            envelop_features=group_df[proc_col].apply(
                    lambda row: extract_envelope_features(
                        signal=row[proc_col],
                        fs=fs
                    ),axis=1    
                )
            if sig_type=="BA" and lowfil=="lowfiltered":
                featureslow = pd.concat([time_features, freq_features], axis=1)
            
            
            time_features = time_features.add_prefix(f"{idname}_{sig_type}_processed")  # 如X161_DE_mean
            freq_features = freq_features.add_prefix(f"{idname}_{sig_type}_processed_fft")  # 如X161_DE_fft_max
            physical_features=physical_features.add_prefix(f"{idname}_{sig_type}_processed_phy")
            sample_entropy=sample_entropy.add_prefix(f"{idname}_{sig_type}_processed_sampen")
            envelop_features=envelop_features.add_prefix(f"{idname}_{sig_type}_processed_envelop") 
            feature_dfs.extend([time_features, freq_features,physical_features,sample_entropy,envelop_features])
            if featureslow:
                featureslow = featureslow.add_prefix(f"{idname}_{sig_type}_lowfiltered")
                feature_dfs.append(featureslow)
        
        # 合并特征与原始数据
        group_df = pd.concat([group_df] + feature_dfs, axis=1)
        
        # 子步骤5：不删除处理后的原始信号列（保留以备后续分析）
        #group_df = group_df.drop(columns=processed_signal_cols)
        
        # 子步骤6：标准化（同一idname的特征用同一标准化器）
        num_cols = [col for col in group_df.columns if 
                   col in ["RPM", "horsepower"] or  # 原始数值列
                   any(col.startswith(f"{idname}_{t}_") for t in ["DE", "FE", "BA"])]  # 提取的特征列
        
        if idname not in id_scaler_dict:
            id_scaler_dict[idname] = StandardScaler()
        group_df[num_cols] = id_scaler_dict[idname].fit_transform(group_df[num_cols])
        
        grouped_results.append(group_df)
    
    # 步骤3：合并所有分组结果，编码故障类型
    if not grouped_results:
        return pd.DataFrame()
    
    final_df = pd.concat(grouped_results, ignore_index=True)
    # 故障类型编码（B→0, IR→1, OR→2）
    le = LabelEncoder()
    final_df["fault_type_encoded"] = le.fit_transform(final_df["fault_type"])
    # 新增数据类型标识（如12fe/48de）
    final_df["data_type"] = data_tag
    
    return final_df

# ----------------------
# 4. 批量预处理所有DataFrame
# ----------------------
def batch_smart_preprocess():
    # 定义每个DataFrame对应的采样率（根据数据类型确定）
    df_fs_bearing_map = {
        "df_12fe_b": {"fs": 12000, "bearing": "FE"},
        "df_12fe_ir": {"fs": 12000, "bearing": "FE"},
        "df_12fe_or": {"fs": 12000, "bearing": "FE"},
        "df_48de_b": {"fs": 48000, "bearing": "DE"},
        "df_48de_ir": {"fs": 48000, "bearing": "DE"},
        "df_48de_or": {"fs": 48000, "bearing": "DE"},
        # 其他DataFrame...
    }
    
    # 存储所有预处理后的DataFrame
    preprocessed_all = []
    
    # 遍历所有生成的DataFrame（你的全局变量）
    for df_name, info in df_fs_bearing_map.items():
        fs = info["fs"]
        bearing_type = info["bearing"]
        # 检查变量是否存在
        if df_name not in globals():
            print(f"跳过不存在的DataFrame: {df_name}")
            continue
        
        raw_df = globals()[df_name]
        data_tag = df_name.split("_")[1]  # 提取数据标识（如12fe/48de）
        print(f"\n===== 开始预处理 {df_name}（采样率: {fs}Hz） =====")
        print(f"原始数据形状: {raw_df.shape}，包含信号列: {[c for c in ['DE_time','FE_time','BA_time'] if c in raw_df.columns]}")
        
        # 执行预处理
        processed_df = preprocess_df(raw_df, data_tag, fs, bearing_type)
        if not processed_df.empty:
            preprocessed_all.append(processed_df)
            print(f"{df_name} 预处理完成，形状: {processed_df.shape}")
            
            # （可选）保存单个预处理后的DataFrame
            #save_path = f"D:\\360极速浏览器下载\\data\\preprocessed\\{df_name}_preprocessed.csv"
            #processed_df.to_csv(save_path, index=False)
            #print(f"已保存至: {save_path}")
        else:
            print(f"{df_name} 预处理后无有效数据")
    
    # 步骤5：合并所有预处理数据（用于后续建模）
    if preprocessed_all:
        combined_df = pd.concat(preprocessed_all, ignore_index=True)
        combined_df = shuffle(combined_df, random_state=42)  # 打乱数据（避免类别集中）
        
        # 保存合并结果
        combined_save_path = r"D:\360极速浏览器下载\data\preprocessed\combined_smart_preprocessed.csv"
        combined_df.to_csv(combined_save_path, index=False)
        
        print(f"\n===== 批量预处理完成 =====")
        print(f"合并后总数据形状: {combined_df.shape}")
        print(f"合并数据保存至: {combined_save_path}")
        print(f"故障类型分布:\n{combined_df['fault_type'].value_counts()}")
        print(f"数据类型分布:\n{combined_df['data_type'].value_counts()}")
        
        return combined_df
    else:
        print("\n无任何有效预处理数据")
        return pd.DataFrame()

# ----------------------
# 5. 执行预处理（直接运行）
# ----------------------
if __name__ == "__main__":
    # 调用批量预处理函数，生成最终合并的预处理数据
    final_preprocessed_df = batch_smart_preprocess()

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
import logging
from scipy.signal import butter, filtfilt
from typing import Dict, List, Optional, Union

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def butter_lowpass_filter(data, cutoff, fs, order=4):
    """低频滤波器，用于提取BA信号中的低频成分"""
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

def extract_time_features(signal):
    """提取时域特征"""
    if len(signal) == 0:
        return pd.Series({
            'rms': 0, 'peak': 0, 'mean': 0, 'std': 0,
            'kurtosis': 0, 'skewness': 0, 'crest_factor': 0
        })
    
    rms = np.sqrt(np.mean(np.square(signal)))
    peak = np.max(np.abs(signal))
    mean = np.mean(signal)
    std = np.std(signal)
    kurtosis = pd.Series(signal).kurtosis()
    skewness = pd.Series(signal).skew()
    crest_factor = peak / (rms + 1e-8)  # 避免除零
    
    return pd.Series({
        'rms': rms,
        'peak': peak,
        'mean': mean,
        'std': std,
        'kurtosis': kurtosis,
        'skewness': skewness,
        'crest_factor': crest_factor
    })

def smart_preprocess(signal, fs, rpm, fault_type, bearing_type):
    """智能预处理函数，根据轴承类型和故障类型调整处理策略"""
    # 基础滤波
    filtered = butter_lowpass_filter(signal, cutoff=1000, fs=fs, order=3)
    
    # 根据轴承类型调整处理
    if bearing_type in ["DE", "FE"]:
        # 对轴承信号增强高频成分
        if "inner" in fault_type.lower() or "outer" in fault_type.lower():
            # 内圈或外圈故障，增强高频特征
            high_freq_cutoff = min(2000, fs/2 * 0.9)  # 避免超过奈奎斯特频率
            filtered = butter_lowpass_filter(filtered, cutoff=high_freq_cutoff, fs=fs, order=3)
    elif bearing_type == "BA":
        # 对基座信号更关注低频
        filtered = butter_lowpass_filter(filtered, cutoff=500, fs=fs, order=4)
    
    return filtered

def extract_frequency_features(signal, fs, rpm):
    """提取频域特征，特别关注BA信号的低频特征"""
    n = len(signal)
    if n < 2:
        return pd.Series({
            'fft_max': 0, 'fft_mean': 0, 'rotation_freq_amp': 0,
            'double_rotation_amp': 0, 'triple_rotation_amp': 0,
            'low_freq_energy': 0, 'mid_low_freq_energy': 0
        })
    
    freq = np.fft.fftfreq(n, 1/fs)[:n//2]
    fft_values = np.abs(np.fft.fft(signal))[:n//2]
    
    # 计算旋转频率 (1x RPM)
    rotation_freq = rpm / 60
    
    # 提取关键频率成分的幅值
    def get_freq_amp(freq_band):
        """获取特定频率带内的最大幅值"""
        mask = (freq >= freq_band[0]) & (freq <= freq_band[1])
        return np.max(fft_values[mask]) if np.any(mask) else 0
    
    total_energy = np.sum(fft_values)
    features = {
        'fft_max': np.max(fft_values),
        'fft_mean': np.mean(fft_values),
        'rotation_freq_amp': get_freq_amp([rotation_freq*0.9, rotation_freq*1.1]),
        'double_rotation_amp': get_freq_amp([rotation_freq*1.9, rotation_freq*2.1]),
        'triple_rotation_amp': get_freq_amp([rotation_freq*2.9, rotation_freq*3.1]),
        'low_freq_energy': np.sum(fft_values[freq <= 100]) / (total_energy + 1e-8) if total_energy > 0 else 0,
        'mid_low_freq_energy': np.sum(fft_values[(freq > 100) & (freq <= 500)]) / (total_energy + 1e-8) if total_energy > 0 else 0
    }
    return pd.Series(features)

def preprocess_df(df, data_tag, fs, bearing_type=None):
    """
    处理同时包含DE、FE和BA信号的DataFrame，融合多测点特征，特别强化BA信号的系统级分析
    
    参数:
        df (pd.DataFrame): 输入数据框，需包含"idname", "RPM", "fault_type"列，
                          以及可选的"DE_time"、"FE_time"和"BA_time"列
        data_tag (str): 数据类型标签，用于标识数据来源
        fs (int/float): 采样频率
        bearing_type (str, optional): 轴承类型，如"DE", "FE"或"BA"
        
    返回:
        pd.DataFrame: 处理后的特征数据框，若处理失败则返回空数据框
    """
    # 输入验证
    if not isinstance(df, pd.DataFrame):
        logger.error("输入不是有效的DataFrame对象")
        return pd.DataFrame()
    
    required_base_cols = ["idname", "RPM", "fault_type"]
    missing_cols = [col for col in required_base_cols if col not in df.columns]
    if missing_cols:
        logger.error(f"缺少必要的基础列: {missing_cols}")
        return pd.DataFrame()
    
    # 检测各测点信号列
    has_de = "DE_time" in df.columns
    has_fe = "FE_time" in df.columns
    has_ba = "BA_time" in df.columns  # BA信号：基座/电机底座振动信号
    
    if not (has_de or has_fe or has_ba):
        logger.warning("无DE、FE或BA信号列，返回空DataFrame")
        return pd.DataFrame()
    
    logger.info(
        f"开始预处理数据，DE信号: {has_de}, FE信号: {has_fe}, BA信号: {has_ba}, "
        f"数据标签: {data_tag}, 轴承类型: {bearing_type}"
    )
    
    grouped_results = []
    id_scaler_dict = {}
    total_groups = df["idname"].nunique()
    processed_groups = 0
    
    for idname, group_df in df.groupby("idname"):
        processed_groups += 1
        logger.info(f"处理组 {idname} ({processed_groups}/{total_groups})")
        
        # 缺失值处理 - 保留关键列非空的行
        required_cols = ["RPM", "fault_type", "idname"]
        if has_de:
            required_cols.append("DE_time")
        if has_fe:
            required_cols.append("FE_time")
        if has_ba:
            required_cols.append("BA_time")
        
        # 检查并转换数据类型
        valid_group = True
        for col in required_cols:
            if col in ["DE_time", "FE_time", "BA_time"]:
                # 确保信号是数组格式
                try:
                    if not isinstance(group_df[col].iloc[0], (list, np.ndarray)):
                        # 尝试将字符串转换为数组
                        group_df[col] = group_df[col].apply(
                            lambda x: np.fromstring(x.strip('[]'), sep=',') if isinstance(x, str) else x
                        )
                except Exception as e:
                    logger.error(f"转换{col}列数据时出错: {str(e)}")
                    valid_group = False
                    break
        
        if not valid_group:
            logger.warning(f"组 {idname} 数据转换失败，跳过处理")
            continue
        
        # 移除关键列有缺失值的行
        initial_count = len(group_df)
        group_df = group_df.dropna(subset=required_cols)
        if len(group_df) < initial_count:
            logger.warning(f"组 {idname} 移除了 {initial_count - len(group_df)} 行缺失值数据")
        
        if group_df.empty:
            logger.warning(f"组 {idname} 处理后为空，跳过")
            continue
        
        # 分别处理各测点信号
        feature_dfs = []
        
        # 处理BA信号（重点关注低频特征）
        if has_ba:
            try:
                # BA信号预处理：滤波降噪和特征增强
                group_df["BA_time_filtered_low"] = group_df.apply(
                    lambda row: butter_lowpass_filter(row["BA_time"], cutoff=100, fs=fs), axis=1
                )
                
                # 提取BA时域特征
                ba_time_features = group_df["BA_time_filtered_low"].apply(extract_time_features)
                ba_time_features = ba_time_features.add_prefix("BA_time_")
                
                # 提取BA频域特征（特别关注低频和旋转频率相关特征）
                ba_freq_features = group_df.apply(
                    lambda row: extract_frequency_features(row["BA_time_filtered_low"], fs, row["RPM"]), axis=1
                )
                ba_freq_features = ba_freq_features.add_prefix("BA_freq_")
                
                # 合并BA特征
                ba_features = pd.concat([ba_time_features, ba_freq_features], axis=1)
                feature_dfs.append(ba_features)
                logger.info(f"组 {idname} 成功提取BA特征 {len(ba_features.columns)} 个")
            except Exception as e:
                logger.error(f"处理组 {idname} 的BA信号时出错: {str(e)}")
        
        # 处理DE信号
        if has_de:
            try:
                # 预处理信号，使用传入的轴承类型参数
                target_bearing = bearing_type if bearing_type == "DE" else "DE"
                group_df["DE_time_processed"] = group_df.apply(
                    lambda row: smart_preprocess(
                        signal=row["DE_time"],
                        fs=fs,
                        rpm=row["RPM"],
                        fault_type=row["fault_type"],
                        bearing_type=target_bearing
                    ), axis=1
                )
                
                # 提取DE特征
                de_time_features = group_df["DE_time_processed"].apply(extract_time_features)
                de_time_features = de_time_features.add_prefix("DE_time_")
                
                de_freq_features = group_df.apply(
                    lambda row: extract_frequency_features(row["DE_time_processed"], fs, row["RPM"]), axis=1
                )
                de_freq_features = de_freq_features.add_prefix("DE_freq_")
                
                de_features = pd.concat([de_time_features, de_freq_features], axis=1)
                feature_dfs.append(de_features)
                logger.info(f"组 {idname} 成功提取DE特征 {len(de_features.columns)} 个")
            except Exception as e:
                logger.error(f"处理组 {idname} 的DE信号时出错: {str(e)}")
        
        # 处理FE信号
        if has_fe:
            try:
                # 预处理信号，使用传入的轴承类型参数
                target_bearing = bearing_type if bearing_type == "FE" else "FE"
                group_df["FE_time_processed"] = group_df.apply(
                    lambda row: smart_preprocess(
                        signal=row["FE_time"],
                        fs=fs,
                        rpm=row["RPM"],
                        fault_type=row["fault_type"],
                        bearing_type=target_bearing
                    ), axis=1
                )
                
                # 提取FE特征
                fe_time_features = group_df["FE_time_processed"].apply(extract_time_features)
                fe_time_features = fe_time_features.add_prefix("FE_time_")
                
                fe_freq_features = group_df.apply(
                    lambda row: extract_frequency_features(row["FE_time_processed"], fs, row["RPM"]), axis=1
                )
                fe_freq_features = fe_freq_features.add_prefix("FE_freq_")
                
                fe_features = pd.concat([fe_time_features, fe_freq_features], axis=1)
                feature_dfs.append(fe_features)
                logger.info(f"组 {idname} 成功提取FE特征 {len(fe_features.columns)} 个")
            except Exception as e:
                logger.error(f"处理组 {idname} 的FE信号时出错: {str(e)}")
        
        # 多测点融合特征（BA与局部测点的关联分析）
        if len(feature_dfs) >= 2 and has_ba:
            try:
                # 计算BA与DE/FE的特征比值，用于交叉验证
                combined_features = pd.concat(feature_dfs, axis=1)
                
                # 振动幅值比（反映故障特征传递情况）
                if has_de and 'DE_time_rms' in combined_features.columns:
                    combined_features['BA_DE_amp_ratio'] = combined_features['BA_time_rms'] / (combined_features['DE_time_rms'] + 1e-8)
                    combined_features['BA_DE_1x_ratio'] = combined_features['BA_freq_rotation_freq_amp'] / (combined_features['DE_freq_rotation_freq_amp'] + 1e-8)
                
                if has_fe and 'FE_time_rms' in combined_features.columns:
                    combined_features['BA_FE_amp_ratio'] = combined_features['BA_time_rms'] / (combined_features['FE_time_rms'] + 1e-8)
                    combined_features['BA_FE_1x_ratio'] = combined_features['BA_freq_rotation_freq_amp'] / (combined_features['FE_freq_rotation_freq_amp'] + 1e-8)
                
                # 系统级健康指标（基于BA低频特征）
                if all(col in combined_features.columns for col in ['BA_freq_low_freq_energy', 'BA_time_rms', 'BA_freq_rotation_freq_amp']):
                    combined_features['system_health_index'] = (
                        0.4 * combined_features['BA_freq_low_freq_energy'] +
                        0.3 * (1 / (combined_features['BA_time_rms'] + 1e-8)) +
                        0.3 * (1 / (combined_features['BA_freq_rotation_freq_amp'] + 1e-8))
                    )
                
                # 替换原有特征集
                feature_dfs = [combined_features]
                logger.info(f"组 {idname} 成功生成多测点融合特征")
            except Exception as e:
                logger.error(f"计算组 {idname} 的多测点融合特征时出错: {str(e)}")
        
        if not feature_dfs:
            logger.warning(f"组 {idname} 未能提取任何特征，跳过")
            continue
        
        # 合并特征
        try:
            features_combined = pd.concat(feature_dfs, axis=1)
            group_df = pd.concat([group_df.reset_index(drop=True), features_combined.reset_index(drop=True)], axis=1)
            # 移除处理后的信号列，释放内存
            group_df = group_df.drop(
                columns=["DE_time_processed", "FE_time_processed", "BA_time_filtered_low"], 
                errors='ignore'
            )
        except Exception as e:
            logger.error(f"组合组 {idname} 的特征时出错: {str(e)}")
            continue
        
        # 标准化数值特征
        try:
            # 确定需要标准化的列
            num_cols = [col for col in group_df.columns if 
                       col in ["RPM", "horsepower"] or 
                       col.startswith("DE_") or 
                       col.startswith("FE_") or
                       col.startswith("BA_") or
                       col.startswith("system_")]
            
            if num_cols:
                if idname not in id_scaler_dict:
                    id_scaler_dict[idname] = StandardScaler()
                
                # 处理可能的无穷值或NaN
                group_df[num_cols] = group_df[num_cols].replace([np.inf, -np.inf], np.nan)
                # 使用列的中位数填充缺失值，更稳健
                group_df[num_cols] = group_df[num_cols].fillna(group_df[num_cols].median())
                
                # 执行标准化
                group_df[num_cols] = id_scaler_dict[idname].fit_transform(group_df[num_cols])
                logger.info(f"组 {idname} 标准化了 {len(num_cols)} 个特征列")
        except Exception as e:
            logger.error(f"标准化组 {idname} 的特征时出错: {str(e)}")
            continue
        
        grouped_results.append(group_df)
    
    # 合并所有组并进行最终处理
    if not grouped_results:
        logger.warning("没有成功处理的组，返回空DataFrame")
        return pd.DataFrame()
    
    try:
        final_df = pd.concat(grouped_results, ignore_index=True)
        logger.info(f"所有组处理完成，总记录数: {len(final_df)}")
        
        # 编码故障类型
        le = LabelEncoder()
        final_df["fault_type_encoded"] = le.fit_transform(final_df["fault_type"])
        logger.info(f"已编码故障类型，共 {len(le.classes_)} 种类型: {', '.join(le.classes_)}")
        
        # 添加数据类型标签和轴承类型
        final_df["data_type"] = data_tag
        if bearing_type:
            final_df["bearing_type"] = bearing_type
        
        # 清理原始信号列，保留特征列
        cols_to_drop = []
        if has_de: cols_to_drop.append("DE_time")
        if has_fe: cols_to_drop.append("FE_time")
        if has_ba: cols_to_drop.append("BA_time")
        final_df = final_df.drop(columns=cols_to_drop, errors='ignore')
        
        return final_df
    except Exception as e:
        logger.error(f"合并结果时出错: {str(e)}")
        return pd.DataFrame()

def batch_smart_preprocess(df_fs_bearing_map: Optional[Dict] = None) -> pd.DataFrame:
    """
    批量预处理多个DataFrame的轴承振动信号数据
    
    参数:
        df_fs_bearing_map: 包含DataFrame名称、采样频率和轴承类型的字典
        
    返回:
        合并后的预处理数据
    """
    # 默认的DataFrame配置映射
    default_map = {
        "df_12fe_b": {"fs": 12000, "bearing": "FE"},
        "df_12fe_ir": {"fs": 12000, "bearing": "FE"},
        "df_12fe_or": {"fs": 12000, "bearing": "FE"},
        "df_48de_b": {"fs": 48000, "bearing": "DE"},
        "df_48de_ir": {"fs": 48000, "bearing": "DE"},
        "df_48de_or": {"fs": 48000, "bearing": "DE"},
        # 可以添加更多DataFrame配置
    }
    
    # 使用传入的映射或默认映射
    processing_map = df_fs_bearing_map if df_fs_bearing_map is not None else default_map
    preprocessed_all = []
    
    # 获取当前全局变量，用于查找DataFrame
    current_globals = globals()
    
    for df_name, info in processing_map.items():
        try:
            fs = info["fs"]
            bearing_type = info["bearing"]
            
            if df_name not in current_globals:
                logger.warning(f"DataFrame '{df_name}' 不存在于全局变量中，跳过处理")
                continue
            
            raw_df = current_globals[df_name]
            
            # 验证是否为有效的DataFrame
            if not isinstance(raw_df, pd.DataFrame):
                logger.warning(f"全局变量 '{df_name}' 不是有效的DataFrame，跳过处理")
                continue
                
            # 提取数据标签（例如从"df_12fe_b"中提取"12fe"）
            data_tag = "_".join(df_name.split("_")[1:-1]) if len(df_name.split("_")) >= 3 else df_name
            
            logger.info(f"开始处理 {df_name}，采样频率: {fs}, 轴承类型: {bearing_type}")
            
            # 预处理数据
            processed_df = preprocess_df(raw_df, data_tag, fs, bearing_type)
            
            if not processed_df.empty:
                preprocessed_all.append(processed_df)
                logger.info(f"完成 {df_name} 处理，生成 {len(processed_df)} 条记录")
            else:
                logger.warning(f"{df_name} 处理后为空，未添加到结果集中")
                
        except KeyError as e:
            logger.error(f"{df_name} 配置信息不完整，缺少键: {str(e)}")
        except Exception as e:
            logger.error(f"处理 {df_name} 时发生错误: {str(e)}")
    
    # 合并所有预处理数据
    if not preprocessed_all:
        logger.warning("没有成功预处理的DataFrame，返回空DataFrame")
        return pd.DataFrame()
    
    try:
        combined_df = pd.concat(preprocessed_all, ignore_index=True)
        logger.info(f"所有DataFrame处理完成，总记录数: {len(combined_df)}")
        return combined_df
    except Exception as e:
        logger.error(f"合并所有预处理数据时出错: {str(e)}")
        return pd.DataFrame()
