In [None]:
import json
import argparse

# 命令行参数解析
parser = argparse.ArgumentParser()
parser.add_argument('--train_xj', type=str, default=None, help='训练轴承列表, 逗号分隔')
parser.add_argument('--test_xj', type=str, default=None, help='测试轴承列表, 逗号分隔')
parser.add_argument('--context_xj', type=str, default=None, help='上下文轴承列表, 逗号分隔')
parser.add_argument('--validation_xj', type=str, default=None, help='验证轴承列表, 逗号分隔')
args, unknown = parser.parse_known_args()

with open('../../config/cnnlstm_config.json', 'r') as f:
    config = json.load(f)

# 解析命令行参数，优先级高于config
if args.train_xj is not None:
    TRAIN_xj = [x.strip() for x in args.train_xj.split(',') if x.strip()]
else:
    TRAIN_xj = config['train_bearings']
if args.test_xj is not None:
    TEST_xj = [x.strip() for x in args.test_xj.split(',') if x.strip()]
else:
    TEST_xj = config['test_bearings']
if args.context_xj is not None:
    CONTEXT_xj = [x.strip() for x in args.context_xj.split(',') if x.strip()]
else:
    CONTEXT_xj = config['context_bearings']
if args.validation_xj is not None:
    VALIDATION_xj = [x.strip() for x in args.validation_xj.split(',') if x.strip()]
else:
    VALIDATION_xj = config['validation_bearings']

batch_size = config['batch_size']
test_batch_size = config['test_batch_size']
window_size = config['window_size']
input_dim = config['input_dim']
output_dim = config['output_dim']
conv_archs = config['conv_archs']
conv_archs = [(1, arch) for arch in conv_archs]  # 将[32, 64]转换为[(1, 32), (1, 64)]
hidden_layer_sizes = config['hidden_layer_sizes']
epochs = config['epochs']
learn_rate = config['learn_rate']
seed = config['seed']
patience = config['patience']
opt = config['opt']

if patience == "inf":
    patience = epochs
else:
    patience = int(patience)

# 加载数据
import torch
from joblib import dump, load
import torch.utils.data as Data
import numpy as np
import pandas as pd
import torch.nn as nn
import os
torch.autograd.set_detect_anomaly(True)
# 参数与配置
torch.manual_seed(seed)  # 设置随机种子，以使实验结果具有可重复性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"当前运行设备: {device}")
from torch.optim.lr_scheduler import CosineAnnealingLR
FPT_dict_xj = {'Bearing1_1': 76, 'Bearing1_2': 44, 'Bearing1_3': 60, 'Bearing1_4': 0, 'Bearing1_5': 39,
            'Bearing2_1': 455, 'Bearing2_2': 48, 'Bearing2_3': 327, 'Bearing2_4': 32, 'Bearing2_5': 141,
            'Bearing3_1': 2344, 'Bearing3_2': 0, 'Bearing3_3': 340, 'Bearing3_4': 1418, 'Bearing3_5': 9}
# 加载数据集
def dataloader(batch_size, test_batch_size, workers=os.cpu_count(), window_size=7):
    # 根据TRAIN_xj，TEST_xj从datasetresult/xjtu_made目录下加载相应训练集和测试集的数据和标签
    def get_data_and_labels(bearing_list, data_dir='../../datasetresult/xjtu_made', window_size=window_size, is_test=False):
        data_list = []
        label_list = []
        for bearing in bearing_list:
            # 文件名格式: c*_Bearing*_*_data, c*_Bearing*_*_label
            # 只要包含该bearing名即可
            data_files = [f for f in os.listdir(data_dir) if bearing in f and f.endswith('_data')]
            label_files = [f for f in os.listdir(data_dir) if bearing in f and f.endswith('_label')]
            # 按文件名排序，确保一一对应
            data_files.sort()
            label_files.sort()
            for data_file, label_file in zip(data_files, label_files):
                data = load(os.path.join(data_dir, data_file))
                label = load(os.path.join(data_dir, label_file))
                # 如果是测试集，且FPT_dict_xj中有该bearing，截取FPT值往后的数据
                if is_test and bearing in FPT_dict_xj:
                    fpt = FPT_dict_xj[bearing]
                    # FPT为0时，保留全部数据
                    if fpt > 0:
                        data = data[fpt:]
                        label = label[fpt:]
                data_list.append(data)
                label_list.append(label)
        # 拼接所有轴承的数据
        if len(data_list) > 0:
            data_all = torch.cat([torch.tensor(d) if not isinstance(d, torch.Tensor) else d for d in data_list], dim=0)
            label_all = torch.cat([torch.tensor(l) if not isinstance(l, torch.Tensor) else l for l in label_list], dim=0)
        else:
            data_all = torch.empty(0)
            label_all = torch.empty(0)
        return data_all, label_all

    # 加载训练集和测试集
    train_set, train_label = get_data_and_labels(TRAIN_xj)
    context_set, context_label = get_data_and_labels(CONTEXT_xj)
    test_set, test_label = get_data_and_labels(TEST_xj, is_test=True)
    validation_set, validation_label = get_data_and_labels(VALIDATION_xj)
    print(train_set.shape, train_label.shape)
    print(test_set.shape, test_label.shape)
    
    # 加载数据
    train_loader = Data.DataLoader(dataset=Data.TensorDataset(train_set, train_label),
                                   batch_size=batch_size, num_workers=workers, drop_last=False)
    context_loader = Data.DataLoader(dataset=Data.TensorDataset(context_set, context_label),
                                   batch_size=batch_size, num_workers=workers, drop_last=False)
    test_loader = Data.DataLoader(dataset=Data.TensorDataset(test_set, test_label),
                                  batch_size=test_batch_size, num_workers=workers, drop_last=True)
    validation_loader = Data.DataLoader(dataset=Data.TensorDataset(validation_set, validation_label),
                                   batch_size=test_batch_size, num_workers=workers, drop_last=True)
    return train_loader, context_loader, test_loader, validation_loader

# 加载数据
train_loader, context_loader, test_loader, validation_loader = dataloader(batch_size, test_batch_size, window_size=window_size)

print(len(train_loader.dataset))
print(len(context_loader.dataset))
print(len(test_loader.dataset))
print(len(validation_loader.dataset))

In [2]:
from torch import nn

# 定义 CNNLSTMModel 模型
class CNNLSTMModel(nn.Module):
    def __init__(self, input_dim, conv_archs, hidden_layer_sizes, output_dim):
        """
        params:
        input_dim          : 输入数据的维度
        conv_archs         : cnn 网络结构
        hidden_layer_sizes : lstm 隐层的数目和维度
        output_dim         : 输出维度数
        """
        super().__init__()
        # 参数
        # CNN参数
        self.conv_arch = conv_archs # cnn网络结构
        self.input_channels = input_dim # 输入通道数
        self.cnn_features = self.make_layers()

        # LSTM参数
        self.num_layers = len(hidden_layer_sizes)  # lstm层数
        self.lstm_layers = nn.ModuleList()  # 用于保存LSTM层的列表
        # 定义第一层LSTM   
        self.lstm_layers.append(nn.LSTM(conv_archs[-1][-1], hidden_layer_sizes[0], batch_first=True))
        # 定义后续的LSTM层
        for i in range(1, self.num_layers):
                self.lstm_layers.append(nn.LSTM(hidden_layer_sizes[i-1], hidden_layer_sizes[i], batch_first=True))

        # 定义线性层
        self.mu  = nn.Linear(hidden_layer_sizes[-1], output_dim)
        self.sigma = nn.Linear(hidden_layer_sizes[-1], output_dim)

    # CNN卷积池化结构
    def make_layers(self):
        layers = []
        for (num_convs, out_channels) in self.conv_arch:
            for _ in range(num_convs):
                layers.append(nn.Conv1d(self.input_channels, out_channels, kernel_size=3, padding=1))
                layers.append(nn.ReLU(inplace=True))
                self.input_channels = out_channels
            # layers.append(nn.MaxPool1d(kernel_size=2, stride=2))
        return nn.Sequential(*layers)

    def forward(self, input_seq): 
        # CNN 卷积池化
        # CNN 网络输入[batch, dim, seq_length]
        #改变输入形状
        input_seq = input_seq.permute(0, 2, 1)
        cnn_features = self.cnn_features(input_seq)
        # print(cnn_features.size()) # torch.Size([32, 64, 1])

        # 送入 LSTM 层
        #改变输入形状，lstm 适应网络输入[batch, seq_length, dim]
        lstm_out = cnn_features.permute(0, 2, 1)
        for lstm in self.lstm_layers:
            lstm_out, _= lstm(lstm_out)  ## 进行一次LSTM层的前向传播  # torch.Size([32, 1, 128])
        mu = self.mu(lstm_out)
        sigma = self.sigma(lstm_out)
        return mu, sigma

In [None]:
import sys
sys.path.append('..')
from loss_function import compute_au_nll, compute_au_nll_with_pos

model = CNNLSTMModel(input_dim, conv_archs, hidden_layer_sizes, output_dim)  

loss_function = compute_au_nll_with_pos  # loss

if config['opt'] == 'Adam':
    optimizer = torch.optim.Adam(model.parameters(), learn_rate)  # 优化器
elif config['opt'] == 'AdamW':
    optimizer = torch.optim.AdamW(model.parameters(), learn_rate)  # 使用AdamW优化器
elif config['opt'] == 'SGD':
    optimizer = torch.optim.SGD(model.parameters(), learn_rate)  # 使用SGD优化器
else:
    raise ValueError(f"Invalid optimizer: {config['opt']}")

# 看下这个网络结构总共有多少个参数
def count_parameters(model):
    params = [p.numel() for p in model.parameters() if p.requires_grad]
    for item in params:
        print(f'{item:>6}')
    print(f'______\n{sum(params):>6}')

count_parameters(model)

In [None]:
print(model)

In [None]:
# 训练模型
import time
import torch.nn.functional as F
import matplotlib
import matplotlib.pyplot as plt
matplotlib.rc("font", family='Microsoft YaHei')

def model_train(epochs, model, optimizer, loss_function, train_loader, device):
    model = model.to(device)

    # 最低MSE  
    minimum_mse = 1000.
    # 最佳模型
    best_model = model

    train_mse = []     # 记录在训练集上每个epoch的 MSE 指标的变化情况   平均值
  
    # 计算模型运行时间
    start_time = time.time()
    for epoch in range(epochs):
         # 训练
        model.train()
        train_mse_loss = []    #保存当前epoch的MSE loss和
        for seq, labels in train_loader: 
            seq, labels = seq.to(device), labels.to(device)
            # 每次更新参数前都梯度归零和初始化
            optimizer.zero_grad()
            # 前向传播
            mu, sigma = model(seq)  #   torch.Size([16, 10])
            # 损失计算
            loss = loss_function(labels, mu, sigma)
            train_mse_loss.append(loss.item()) # 计算 MSE 损失
            # 反向传播和参数更新
            loss.backward()
            optimizer.step()
        #     break
        # break
        # 计算总损失
        train_av_mseloss = np.average(train_mse_loss) # 平均
        train_mse.append(train_av_mseloss)
        print(f'Epoch: {epoch+1:2} train_MSE-Loss: {train_av_mseloss:10.8f}')
       
        # 如果当前模型的 MSE 低于于之前的最佳准确率，则更新最佳模型
        #保存当前最优模型参数
        if train_av_mseloss < minimum_mse:
            minimum_mse = train_av_mseloss
            best_model = model# 更新最佳模型的参数
         
    # 保存最后的参数
    # torch.save(model, 'final_model_cnn_lstm.pt')
    # 保存最好的参数
    torch.save(best_model, 'best_model_cnnlstm.pt')
    print(f'\nDuration: {time.time() - start_time:.0f} seconds')
    
    # 可视化
    plt.plot(range(epochs), train_mse, color = 'b',label = 'train_MSE-loss')
    plt.legend()
    plt.show()   #显示 lable 
    print(f'min_MSE: {minimum_mse}')

#  模型训练
model_train(epochs, model, optimizer, loss_function, train_loader, device)

# 测试集评估

In [6]:
# 模型 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载模型的状态字典
model = torch.load('best_model_cnnlstm.pt', weights_only=False)
model = model.to(device)

# 预测数据
test_origin_data = []
test_pre_data = []
with torch.no_grad():
    for data, label in test_loader:
        # 原始标签
        origin_lable = label.tolist()
        test_origin_data += origin_lable
        model.eval()  # 将模型设置为评估模式
        
        # 预测
        data, label = data.to(device), label.to(device)
        mu, sigma = model(data)  # 对测试集进行预测
        pred = mu.tolist()
        sigma = sigma.tolist()
        test_pre_data += pred        

In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


# 反归一化处理
# 使用相同的均值和标准差对预测结果进行反归一化处理
# 反标准化
scaler_dir = '../../datasetresult/xjtu_made'
scaler_filename = f'{TEST_xj[0]}_labeled_scaler'
all_files = os.listdir(scaler_dir)
matched_files = [f for f in all_files if f.endswith(scaler_filename)]
if not matched_files:
    raise FileNotFoundError(f"未找到匹配 {TEST_xj[0]} 的 scaler 文件, 目录: {scaler_dir}")
scaler_path = os.path.join(scaler_dir, matched_files[0])
scaler = load(scaler_path)

test_origin_data = scaler.inverse_transform(np.array(test_origin_data).reshape(-1, 1)).reshape(-1)
test_pre_data = scaler.inverse_transform(np.array(test_pre_data).reshape(-1, 1)).reshape(-1)


# 模型分数
score = r2_score(test_origin_data, test_pre_data)
print('测试集上 模型分数-R^2:',score)

print('*'*50)
# 训练集上的预测误差
test_mse = mean_squared_error(test_origin_data, test_pre_data)
test_rmse = np.sqrt(test_mse)
test_mae = mean_absolute_error(test_origin_data, test_pre_data)
print('测试数据集上的均方误差-MSE: ',test_mse)
print('测试数据集上的均方根误差-RMSE: ',test_rmse)
print('测试数据集上的平均绝对误差-MAE: ',test_mae)

In [None]:
import matplotlib
import matplotlib.pyplot as plt
matplotlib.rc("font", family='Microsoft YaHei')

# 可视化结果
plt.figure(figsize=(12, 6), dpi=100)
plt.plot(test_origin_data, label='真实寿命',color='orange')  # 真实值
plt.plot(test_pre_data, label='CNN-LSTM 预测值',color='green')  # 预测值

# plt.plot([-1,170],[2.0*0.7,2.0*0.7],c='black',lw=1,ls='--')  # 临界点直线  可自己调整位置

plt.xlabel('运行周期/10s', fontsize=12)
plt.ylabel('寿命百分比', fontsize=12)
plt.title(f'{TEST_xj[0]} 预测结果', fontsize=16)
plt.legend()
plt.show()

# 保存数据
dump(test_origin_data, '../画图对比/cnn_lstm_origin') 
dump(test_pre_data, '../画图对比/cnn_lstm_pre') 