In [None]:
#SAC
import gymnasium as gym
import numpy as np
import math
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import CheckpointCallback
import pde_control_gym
from pde_control_gym.src import TunedReward1D

# 物理参数配置
T = 1
dt = 1e-3
dx = 0.01
X = 1
lamArr = 9

def solveBetaFunction(x, gamma):
    beta = np.zeros(len(x), dtype=np.float32)
    for idx, val in enumerate(x):
        beta[idx] = 50 * math.cos(gamma * math.acos(val))
    return beta

# PDE环境参数
env_params = {
    "T": T,
    "dt": dt,
    "X": X,
    "dx": dx,
    "lamArr": lamArr,
    "reward_class": TunedReward1D(int(T / dt), -1e3, 3e2),
    "normalize": True,
    "sensing_loc": "full",
    "control_type": "Dirchilet",
    "sensing_noise_func": lambda state: state,
    "max_state_value": 1e10,
    "max_control_value": 20,
    "reset_init_condition_func": lambda nx: np.ones(nx + 1) * np.random.uniform(1, 10),
    "reset_recirculation_func": lambda nx, lamArr: solveBetaFunction(np.linspace(0, 1, nx + 1), lamArr),
    "control_sample_rate": 0.01,
}

def main():
    # 创建环境
    env = gym.make("PDEControlGym-ReactionDiffusionPDE1D", **env_params)
    
    model = SAC(
        "MlpPolicy",
        env,
        policy_kwargs=policy_kwargs,
        learning_rate=1e-4,
        buffer_size=100000,
        batch_size=256,
        tau=0.002,
        gamma=0.99,
        verbose=1,
        tensorboard_log="./tb_SAC/",
    )
    
    # 检查点回调
    checkpoint_callback = CheckpointCallback(
        save_freq=100000,
        save_path="./logs_SAC/",
        name_prefix="sac_model",
    )
    
    # 训练模型
    print("开始训练（使用默认特征提取器）...")
    model.learn(total_timesteps=1e5, callback=checkpoint_callback)
    


if __name__ == "__main__":
    main()

In [None]:
#NOSAC
import gymnasium as gym
import numpy as np
import math
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import CheckpointCallback
import pde_control_gym
from pde_control_gym.src import TunedReward1D

# 物理参数配置
T = 1
dt = 1e-3
dx = 0.01
X = 1
lamArr = 9

def solveBetaFunction(x, gamma):
    beta = np.zeros(len(x), dtype=np.float32)
    for idx, val in enumerate(x):
        beta[idx] = 50 * math.cos(gamma * math.acos(val))
    return beta

# PDE环境参数
env_params = {
    "T": T,
    "dt": dt,
    "X": X,
    "dx": dx,
    "lamArr": lamArr,
    "reward_class": TunedReward1D(int(T / dt), -1e3, 3e2),
    "normalize": True,
    "sensing_loc": "full",
    "control_type": "Dirchilet",
    "sensing_noise_func": lambda state: state,
    "max_state_value": 1e10,
    "max_control_value": 20,
    "reset_init_condition_func": lambda nx: np.ones(nx + 1) * np.random.uniform(1, 10),
    "reset_recirculation_func": lambda nx, lamArr: solveBetaFunction(np.linspace(0, 1, nx + 1), lamArr),
    "control_sample_rate": 0.01,
}

def main():
    # 创建环境
    env = gym.make("PDEControlGym-ReactionDiffusionPDE1D", **env_params)
    
   
    from DeepONet import CustomFeatureExtractor
    
    # 策略配置
    policy_kwargs = {
        "features_extractor_class": CustomFeatureExtractor,
        "features_extractor_kwargs": {"features_dim": 102},
    }
    
    # 创建SAC模型
    model = SAC(
        "MlpPolicy",
        env,
        policy_kwargs=policy_kwargs,
        learning_rate=1e-4,
        buffer_size=100000,
        batch_size=256,
        tau=0.002,
        gamma=0.99,
        verbose=1,
        tensorboard_log="./tb_NOSAC/",
    )
    
    # 检查点回调
    checkpoint_callback = CheckpointCallback(
        save_freq=100000,
        save_path="./logs_NOSAC/",
        name_prefix="sac_model",
    )
    
    # 训练模型
    print("开始训练（使用DeepONet特征提取器）...")
    model.learn(total_timesteps=1e5, callback=checkpoint_callback)
    

if __name__ == "__main__":
    main()

In [None]:
#NOSAC_training
import gymnasium as gym
import numpy as np
import math
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import CheckpointCallback
import pde_control_gym
from pde_control_gym.src import TunedReward1D
from RONet import CustomFeatureExtractor

# 物理参数
T = 1
dt = 1e-3
dx = 0.01
X = 1
lamArr = 9

def solveBetaFunction(x, gamma):
    beta = np.zeros(len(x), dtype=np.float32)
    for idx, val in enumerate(x):
        beta[idx] = 50 * math.cos(gamma * math.acos(val))
    return beta

# PDE环境参数
env_params = {
    "T": T,
    "dt": dt,
    "X": X,
    "dx": dx,
    "lamArr": lamArr,
    "reward_class": TunedReward1D(int(T / dt), -1e3, 3e2),
    "normalize": True,
    "sensing_loc": "full",
    "control_type": "Dirchilet",
    "sensing_noise_func": lambda state: state,
    "max_state_value": 1e10,
    "max_control_value": 20,
    "reset_init_condition_func": lambda nx: np.ones(nx + 1) * np.random.uniform(1, 10),
    "reset_recirculation_func": lambda nx, lamArr: solveBetaFunction(np.linspace(0, 1, nx + 1), lamArr),
    "control_sample_rate": 0.01,
}

def main():
    # 创建环境
    env = gym.make("PDEControlGym-ReactionDiffusionPDE1D", **env_params)
    
    # 策略配置
    policy_kwargs = dict(
        features_extractor_class=CustomFeatureExtractor,
        features_extractor_kwargs=dict(features_dim=102),
    )
    
    # 创建SAC模型
    model = SAC(
        "MlpPolicy",
        env,
        policy_kwargs=policy_kwargs,
        learning_rate=1e-4,
        buffer_size=100000,
        batch_size=256,
        tau=0.002,
        gamma=0.99,
        verbose=1,
        tensorboard_log="./tb_NOSAC_training/",
    )
    
    # 检查点回调
    checkpoint_callback = CheckpointCallback(
        save_freq=100000,
        save_path="./logs_NOSAC_training_run/",
        name_prefix="sac_model",
    )
    
    # 训练模型
    print("开始单次训练...")
    model.learn(total_timesteps=1e5, callback=checkpoint_callback)

if __name__ == "__main__":
    main()