In [3]:
import gym
import numpy as np
from gym import spaces
from stable_baselines3 import PPO
import shap
from mlutils import *
from tqdm import tqdm

加载标准化数据和scaler模型


In [4]:
# 配置类
class Config:
    # 目标性能阈值（根据需求修改）
    TARGETS = np.array([134, 278, 22.5])  # 三个性能指标的最低要求
    # SHAP参数
    SAMPLE_RATIO = 0.1     # 背景数据采样比例
    N_SUMMARY = 100        # SHAP背景数据压缩量
    TOP_K = 5              # 每个样本选择的关键参数数量

    MAX_STEPS = 100  # 最大步数
    BUFFER_RATIO = 0.1  # 动态范围比例

In [5]:
# 识别不合格样本
def find_unqualified_samples(y,  tragets):
    """找到至少有一个性能指标不达标的样本"""
    unqualified_mask = np.any(y < tragets, axis=1)
    return np.where(unqualified_mask)[0]

# 判断力学性能是否达标（用户自定义）
def is_satisfied(y_pred,  tragets):
    # 示例：假设 y_pred 是 3 个力学性能值，targets 是达标指标
    return all(y >= t for y, t in zip(y_pred, tragets))

#反标准化函数
def inverse_normalize(sample,scaler):
    inverse_sample=scaler.inverse_transform(sample.reshape(1,-1))
    return inverse_sample[0]
    

In [None]:
Procedure_header=['化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', '化学元素含量', 
                  '热轧', '热轧', '热轧', '热轧', '热轧', '热轧', '热轧', '热轧', 
                  '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', 
                  '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', 
                  '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧', '冷轧',
                  '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌', '镀锌']
Parameter_header = [
    "碳", "硅", "锰", "磷", "硫", "钛", "铌", "氧", "氮", "材料实际重量", "出口材料实际厚度",
    "出口材料实际宽度", "卷取温度平均值", "出炉温度", "在炉时间", "精轧入口平均温度", "精轧出口平均温度",
    "出口材料实际厚度公差", "出口材料实际宽度公差", "出口材料实际重量", "入口材料1厚度", "入口材料1宽度",
    "入口材料1重量", "S1机架压下率", "S2机架压下率", "S3机架压下率", "S4机架压下率", "S5机架压下率",
    "S1机架入口张力", "S1~S2机架间张力", "S2～S3机架间张力", "S3～S4机架间张力", "S4～S5机架间张力",
    "S5出口张力", "S1机架入口单位张力", "S1~S2机架间单位张力", "S2~S3机架间单位张力",
    "S3~S4机架间单位张力", "S4~S5机架间单位张力", "S5机架工作轧辊粗糙度(底)", "S5机架工作轧辊粗糙度(上)",
    "1#机架轧制力模型设定值", "2#机架轧制力模型设定值", "3#机架轧制力模型设定值", "4#机架轧制力模型设定值",
    "5#机架轧制力模型设定值", "拉矫率平均值", "1#酸槽温度", "2#酸槽温度", "3#酸槽温度", "酸洗工序速度平均值1",
    "上表面镀层重量", "下表面镀层重量", "平整率平均值", "上表面涂油量", "下表面涂油量", "工艺段速度平均值",
    "ES平均温度", "FCS平均温度", "IHS平均温度", "SCS平均温度", "SF平均温度", "RCS平均温度",
    "RTF平均温度", "JPF平均温度"
]


In [7]:
test_x,test_y=x_y_split(test_data_path, scaler=joblib.load(scaler_model_path))
train_x,train_y=x_y_split(train_data_path, scaler=joblib.load(scaler_model_path))

In [8]:
model_name='Random Forest'
model=models[model_name]
model=joblib.load(pre_model_path + model_name + '.pkl')

In [9]:
#随机提取train_x的0.1倍样本
train_x_sample=train_x[np.random.choice(train_x.shape[0], int(train_x.shape[0]*0.1), replace=False)]
train_x_sample_summary = shap.sample(train_x_sample, 100)
#使用shap的kernel explainer对混合模型
explainer = shap.KernelExplainer(model.predict, train_x_sample_summary)
def cal_shap_values(x):
    return explainer.shap_values(x)

In [10]:
def generate_param_bounds(sample, X_data, top_indices, buffer_ratio):
    """
    作用：根据历史数据和当前样本生成优化参数范围
    sample: 当前样本
    X_data: 历史数据
    top_indices: 优化参数索引
    buffer——ratio: 动态范围比例
    """
    bounds = {}
    for idx in top_indices:
        # 全局数据范围（考虑工艺限制）
        global_min = X_data[:, idx].min()
        global_max = X_data[:, idx].max()
        
        # 当前值
        current_val = sample[idx]
        
        # 动态范围：当前值±buffer_ratio范围的全局裁剪
        buffer_range = (global_max - global_min) * buffer_ratio
        min_val = max(global_min, current_val - buffer_range)
        max_val = min(global_max, current_val + buffer_range)
        
        bounds[f'x{idx}'] = (min_val, max_val)
    return bounds

In [None]:
# 自定义环境
class ParamOptimizationEnv(gym.Env):
    def __init__(self, sample, top_indices, bounds, model, is_satisfied):
        super(ParamOptimizationEnv, self).__init__()
        self.sample = sample.copy()  # 初始工艺参数
        self.top_indices = top_indices  # 需要优化的参数索引
        self.bounds = bounds  # 优化范围
        self.model = model  # 预测模型
        self.is_satisfied = is_satisfied  # 达标判断函数
        self.current_params = sample.copy()
        self.action_space = spaces.Box(low=-1, high=1, shape=(len(top_indices),), dtype=np.float32)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(len(top_indices),), dtype=np.float32)
        self.max_steps = Config.MAX_STEPS
        self.current_step = 0

    def reset(self):
        self.current_params = self.sample.copy()
        self.current_step = 0
        return self._get_obs()

    def step(self, action):
        # 更新需要优化的参数
        for i, idx in enumerate(self.top_indices):
            delta = action[i] * (self.bounds[f'x{idx}'][1] - self.bounds[f'x{idx}'][0]) / 2
            self.current_params[idx] += delta
            self.current_params[idx] = np.clip(self.current_params[idx], self.bounds[f'x{idx}'][0], self.bounds[f'x{idx}'][1])

        # 预测力学性能
        y_pred = self.model.predict(self.current_params.reshape(1, -1))[0]
        satisfied = self.is_satisfied(y_pred, Config.TARGETS)
        
        # 奖励设计
        if satisfied:
            reward = 10  # 达标时高奖励
        else:
            # 未达标时奖励与目标距离负相关
            reward = -sum(abs(y - t) for y, t in zip(y_pred, [0.8, 0.9, 1.0])) / 3
        reward -= 0.1  # 步数惩罚
        
        self.current_step += 1
        done = satisfied or self.current_step >= self.max_steps
        return self._get_obs(), reward, done, {"y_pred": y_pred}

    def _get_obs(self):
        return self.current_params[self.top_indices]

# 主函数：优化单个样本
def optimize_sample(sample, X_data, model,ppo_model=None):
    # 计算 SHAP 值并选择 top_k 参数
    shap_values = cal_shap_values(sample)
    top_indices = np.argsort(np.abs(shap_values).mean(1))[::-1][:Config.TOP_K]
    bounds = generate_param_bounds(sample, X_data, top_indices, Config.BUFFER_RATIO)

    # 创建环境
    env = ParamOptimizationEnv(sample, top_indices, bounds, model, is_satisfied)

    # 训练 PPO 模型
    ppo_model = PPO("MlpPolicy", env, verbose=0, batch_size=256, n_epochs=10)
    ppo_model.learn(total_timesteps=100)

    # 测试优化
    obs = env.reset()
    for _ in range(Config.MAX_STEPS):
        action, _ = ppo_model.predict(obs)
        obs, reward, done, info = env.step(action)
        if done:
            # print(f"优化完成，调整后的力学性能: {info['y_pred']}")
            break
    if is_satisfied(info['y_pred'], Config.TARGETS):
        status='success'
        opt_pred=info['y_pred']
    else:
        status='fail'
        opt_pred=None
    return env.current_params, top_indices, bounds, status, opt_pred
# #示例调用
unqualified_samples = find_unqualified_samples(test_y, Config.TARGETS)
sample_idx = unqualified_samples[0]
sample = test_x[sample_idx]
print(f"待优化样本索引: {sample_idx}")
optimized_params, top_indices, bounds, status,opt_pred = optimize_sample(sample, train_x_sample, model)
print(f"优化结果: {status}")
if status=='success':
    print(f"优化前的力学性能: {test_y[sample_idx]}")
    print(f"优化后的力学性能: {opt_pred}")

    print('优化前的参数：')
    print(inverse_normalize(sample,joblib.load(scaler_model_path)))
    print('优化后的参数：')
    print(inverse_normalize(optimized_params,joblib.load(scaler_model_path)))

else:
    print(f"优化失败")
    print(f"优化前的力学性能: {test_y[sample_idx]}")
    print(f"优化后的力学性能: {opt_pred}")
    print('优化前的参数：')
    print(inverse_normalize(sample,joblib.load(scaler_model_path)))
    print('优化后的参数：')
    print(inverse_normalize(optimized_params,joblib.load(scaler_model_path)))


In [None]:
def optimize_many(test_x, test_y, X_data, model):
        #找到不合格样本
    unqualified_samples = find_unqualified_samples(test_y, Config.TARGETS)
    results = []
    # i=0
    for idx in tqdm(unqualified_samples):
        
        # i=i+1

        ori_sample = test_x[idx]
        ori_pred=test_y[idx]
        optimized_params, top_indices, bounds, status,opt_pred = optimize_sample(ori_sample, X_data, model)
        #对工艺参数进行反标准化
        ori_sample=inverse_normalize(ori_sample,scaler)
        optimized_params=inverse_normalize(optimized_params,scaler)
        #记录结果
        if status=='success':
            param_changes=[]
            for p_idx in top_indices:
                ori_val=ori_sample[p_idx]
                opt_val=optimized_params[p_idx]
                change_pct=(opt_val-ori_val)/ori_val*100
                param_changes.append({
                    'param_idx':int(p_idx),
                    'param_name':Procedure_header[int(p_idx)]+'过程的'+Parameter_header[int(p_idx)],
                    'ori_val':float(ori_val),
                    'opt_val':float(opt_val),
                    'change_pct':float(change_pct)
                
                })
            #把ori_pred和opt_pred转换为列表
            if hasattr(ori_pred,'tolist'):
                ori_pred=ori_pred.tolist()
            else:
                ori_pred=list(ori_pred)
            if hasattr(opt_pred,'tolist'):
                opt_pred=opt_pred.tolist()
            else:
                opt_pred=list(opt_pred)
            
            results.append({
                'sample_id':int(idx),
                'status':status,
                'original_performance':ori_pred,
                'optimized_performance':opt_pred,
                'param_changes':param_changes,
                'top_params_index':[int(p_idx) for p_idx in top_indices.tolist()],
                'top_paras':[Parameter_header[int(pidx)] for pidx in top_indices.tolist()]
            })
        
        else:
            results.append({
                'sample_id':int(idx),
                'status':status,
                'original_performance':ori_pred,
                'optimized_performance':None,
                'param_changes':None,
                'top_params_index':[int(p_idx) for p_idx in top_indices.tolist()],
                'top_paras':[Parameter_header[int(pidx)] for pidx in top_indices.tolist()]
            })

        # if i==4:
        #     break
        
    df_results=pd.DataFrame(results)
    success_rate=(df_results['status'] == 'success').mean()
    print(f"\n优化完成！成功率：{success_rate:.1%}")
    # 保存到CSV（示例），编码为UTF-8
    df_results.to_csv('results_PPO.csv', index=False, encoding='utf-8-sig')
    
    return df_results


In [None]:
optimize_many(test_x, test_y, train_x_sample, model)
"""
100%|██████████| 408/408 [3:54:54<00:00, 34.54s/it]  

优化完成！成功率：99.3%
"""

'\n100%|██████████| 408/408 [3:54:54<00:00, 34.54s/it]  \n\n优化完成！成功率：99.3%\n'

In [None]:
# from stable_baselines3.common.vec_env import DummyVecEnv
# from contextlib import redirect_stdout
# PPO_model_path='./model/pretrained_ppo.zip'
# def optimize_many(test_x, test_y, X_data, model):
#     # 找到不合格样本
#     unqualified_samples = find_unqualified_samples(test_y, Config.TARGETS)
    
#     #不存在预训练模型
#     if not os.path.exists(PPO_model_path):
#         # 1. 选择预训练样本（例如 10% 的未达标样本）
#         print("未找到预训练模型，开始训练...")
#         pretrain_size = max(1, int(0.1 * len(unqualified_samples)))  # 至少选择 1 个样本
#         pretrain_indices = np.random.choice(unqualified_samples, size=pretrain_size, replace=False)
        
#         # 2. 创建预训练环境 - 先计算 top_indices 和 bounds
#         pretrain_envs = []
#         for idx in pretrain_indices:
#             sample = test_x[idx]
#             shap_values = cal_shap_values(sample)
#             top_indices = np.argsort(np.abs(shap_values).mean(1))[::-1][:Config.TOP_K]
#             bounds = generate_param_bounds(sample, X_data, top_indices, Config.BUFFER_RATIO)
#             env = ParamOptimizationEnv(sample, top_indices, bounds, model, is_satisfied)
#             pretrain_envs.append(env)
        
#         vec_env = DummyVecEnv([lambda env=env: env for env in pretrain_envs])  # 使用参数捕获防止延迟绑定
        
#         # 3. 训练预训练模型
#         pretrained_ppo = PPO("MlpPolicy", vec_env, verbose=0, batch_size=256, n_epochs=10)
#         pretrained_ppo.learn(total_timesteps=1000)  # 训练通用模型
#         pretrained_ppo.save(PPO_model_path)     # 保存模型
    
#     print("开始优化未达标样本...")
#     # 4. 对所有未达标样本进行优化
#     results = []
#     for idx in tqdm(unqualified_samples):
#         ori_sample = test_x[idx]
#         ori_pred = test_y[idx]
        
#         # 计算 SHAP 值并选择 top_k 参数
#         shap_values = cal_shap_values(ori_sample)
#         top_indices = np.argsort(np.abs(shap_values).mean(1))[::-1][:Config.TOP_K]
#         bounds = generate_param_bounds(ori_sample, X_data, top_indices, Config.BUFFER_RATIO)
        
#         # 创建当前样本的环境
#         env = ParamOptimizationEnv(ori_sample, top_indices, bounds, model, is_satisfied)
#         # 加载预训练模型并微调，屏蔽输出
#         with open(os.devnull, 'w') as f:
#             with redirect_stdout(f):
#                 ppo_model = PPO.load(PPO_model_path, env=env, verbose=0)
#         ppo_model.learn(total_timesteps=1000)  # 微调，时间步远少于从头训练
        
#         # 测试优化
#         obs = env.reset()
#         for _ in range(Config.MAX_STEPS):
#             action, _ = ppo_model.predict(obs)
#             obs, reward, done, info = env.step(action)
#             if done:
#                 break
        
#         # 判断优化结果
#         if is_satisfied(info['y_pred'], Config.TARGETS):
#             status = 'success'
#             opt_pred = info['y_pred']
#         else:
#             status = 'fail'
#             opt_pred = None
#         optimized_params = env.current_params
        
#         # 反标准化工艺参数
#         ori_sample = inverse_normalize(ori_sample, scaler)
#         optimized_params = inverse_normalize(optimized_params, scaler)
        
#         # 记录结果
#         if status == 'success':
#             param_changes = []
#             for p_idx in top_indices:
#                 ori_val = ori_sample[p_idx]
#                 opt_val = optimized_params[p_idx]
#                 change_pct = (opt_val - ori_val) / ori_val * 100
#                 param_changes.append({
#                     'param_idx': int(p_idx),
#                     'param_name': Procedure_header[int(p_idx)] + '过程的' + Parameter_header[int(p_idx)],
#                     'ori_val': float(ori_val),
#                     'opt_val': float(opt_val),
#                     'change_pct': float(change_pct)
#                 })
            
#             # 转换为列表
#             ori_pred = ori_pred.tolist() if hasattr(ori_pred, 'tolist') else list(ori_pred)
#             opt_pred = opt_pred.tolist() if hasattr(opt_pred, 'tolist') else list(opt_pred)
            
#             results.append({
#                 'sample_id': int(idx),
#                 'status': status,
#                 'original_performance': ori_pred,
#                 'optimized_performance': opt_pred,
#                 'param_changes': param_changes,
#                 'top_params_index': [int(p_idx) for p_idx in top_indices.tolist()],
#                 'top_paras': [Parameter_header[int(p_idx)] for p_idx in top_indices.tolist()]
#             })
#         else:
#             results.append({
#                 'sample_id': int(idx),
#                 'status': status,
#                 'original_performance': ori_pred,
#                 'optimized_performance': None,
#                 'param_changes': None,
#                 'top_params_index': [int(p_idx) for p_idx in top_indices.tolist()],
#                 'top_paras': [Parameter_header[int(p_idx)] for p_idx in top_indices.tolist()]
#             })
    
#     # 5. 处理结果
#     df_results = pd.DataFrame(results)
#     success_rate = (df_results['status'] == 'success').mean()
#     print(f"\n优化完成！成功率：{success_rate:.1%}")
#     df_results.to_csv('results_PPO.csv', index=False, encoding='utf-8-sig')
    
#     return df_results

In [None]:
# optimize_many(test_x, test_y, train_x_sample, model)