# 基于TD3算法的机械臂位置跟踪控制

本笔记本实现了使用TD3(Twin Delayed Deep Deterministic Policy Gradient)算法控制机械臂进行位置跟踪任务。我们将使用MuJoCo作为物理仿真环境。

## 导入必要的库

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import mujoco
import time
import matplotlib.pyplot as plt
from collections import deque
import os
import copy

# 尝试导入MuJoCo可视化器
try:
    import mujoco.viewer
    MUJOCO_VIEWER_AVAILABLE = True
except ImportError:
    MUJOCO_VIEWER_AVAILABLE = False
    print("警告: 无法导入mujoco.viewer，可视化功能将不可用")

# 添加Z-Score标准化类
class RunningMeanStd:
    """
    动态计算均值和标准差用于状态归一化
    使用Welford在线算法计算均值和标准差
    """
    def __init__(self, shape):
        self.n = 0
        self.mean = np.zeros(shape)
        self.S = np.zeros(shape)
        self.std = np.sqrt(self.S)

    def update(self, x):
        """
        更新均值和标准差
        
        参数:
        - x: 新的状态数据
        """
        batch_mean = np.mean(x, axis=0)
        batch_S = np.sum((x - batch_mean)**2, axis=0)
        batch_n = x.shape[0]
        
        # 更新总样本数
        total_n = self.n + batch_n
        
        # 更新均值
        delta = batch_mean - self.mean
        new_mean = self.mean + delta * batch_n / total_n
        
        # 更新方差
        delta2 = batch_mean - new_mean
        new_S = self.S + batch_S + delta**2 * self.n * batch_n / total_n
        
        # 更新参数
        self.n = total_n
        self.mean = new_mean
        self.S = new_S
        self.std = np.sqrt(self.S / (self.n - 1)) if self.n > 1 else np.sqrt(self.S)
        
    def normalize(self, x):
        """
        对输入数据进行归一化
        
        参数:
        - x: 待归一化的状态数据
        
        返回:
        - 归一化后的数据
        """
        return (x - self.mean) / (self.std + 1e-8)  # 添加小常数防止除零错误




## 检查设备

In [None]:
# 检查CUDA可用性
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

## 定义机械臂环境类

In [None]:
class RobotArmEnv:
    def __init__(self, model_path="robot_arm_mujoco.xml", normalize_states=True):
        # 加载MuJoCo模型
        self.model = mujoco.MjModel.from_xml_path(model_path)
        self.data = mujoco.MjData(self.model)
        
        # 获取关节数量
        self.nu = self.model.nu
        self.nq = self.model.nq
        
        # 设置动作空间
        self.action_low = -20.0  # 扭矩限制
        self.action_high = 20.0
        
        # 设置目标位置 (末端执行器)
        self.target_pos = np.array([0.2, 0.2, 0.2])  # 初始目标位置
        
        # 初始化可视化器为None
        self.viewer = None
        
        # 状态归一化参数
        self.normalize_states = normalize_states
        if self.normalize_states:
            # 状态维度: 关节角度(6) + 关节速度(6) + 关节扭矩(6) + 末端位置(3) + 末端速度(3) + 目标位置(3)
            state_dim = 6 + 6 + 6 + 3 + 3 + 3  # 27维状态
            self.state_rms = RunningMeanStd(shape=(state_dim,))
        
        # 重置环境
        self.reset()
        
    def render(self):
        # 检查可视化器是否可用
        if not MUJOCO_VIEWER_AVAILABLE:
            return
        
        # 如果可视化器尚未创建，则创建一个
        if self.viewer is None:
            try:
                # 尝试创建被动式可视化器
                self.viewer = mujoco.viewer.launch_passive(self.model, self.data)
                # 初始化目标可视化添加标记
                self._target_viz_added = False
            except Exception as e:
                print(f"无法启动可视化器: {e}")
                return
        
        # 如果可视化器已经创建，同步更新场景
        if self.viewer:
            try:
                # 添加目标位置的可视化（绿色小球）
                self._add_target_visualization()
                self.viewer.sync()
            except Exception as e:
                print(f"可视化器同步失败: {e}")
                self.viewer = None
    
    def _add_target_visualization(self):
        """
        在目标位置添加一个绿色小球用于可视化
        """
        if self.viewer and not getattr(self, '_target_viz_added', False):
            # 获取用户场景对象
            scn = self.viewer.user_scn
            # 增加一个几何体（小球）
            if scn.ngeom < scn.maxgeom:
                # 获取新增几何体的引用
                geom = scn.geoms[scn.ngeom]
                scn.ngeom += 1
                # 初始化几何体为球体
                mujoco.mjv_initGeom(
                    geom,
                    mujoco.mjtGeom.mjGEOM_SPHERE,
                    np.array([0.02, 0.02, 0.02]),  # 球体半径
                    self.target_pos,  # 球体位置（目标位置）
                    np.array([1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]),  # 单位矩阵
                    np.array([0.0, 1.0, 0.0, 0.8])  # RGBA: 绿色，半透明
                )
                # 设置几何体类别为仅可视化，不参与物理碰撞
                geom.category = mujoco.mjtCatBit.mjCAT_DECOR
                # 标记已添加目标可视化
                self._target_viz_added = True
    
    def reset(self):
        # 重置MuJoCo数据
        mujoco.mj_resetData(self.model, self.data)
        
        # 设置随机目标位置
        self.target_pos = np.random.uniform(-0.3, 0.3, size=3)
        
        # 确保目标位置为合法值，x\y不能在(-0.1, 0.1)范围内，z轴要大于0.05
        if self.target_pos[0] >= 0:
            self.target_pos[0] = max(self.target_pos[0], 0.1)
        else:
            self.target_pos[0] = min(self.target_pos[0], -0.1)

        if self.target_pos[1] >= 0:
            self.target_pos[1] = max(self.target_pos[1], 0.1)
        else:
            self.target_pos[1] = min(self.target_pos[1], -0.1)

        self.target_pos[2] = max(self.target_pos[2], 0.05)

        # 重置停留时间计数器
        self.stay_time = 0.0
        self.required_stay_time = 0.5  # 需要停留0.5秒
        
        # 获取模型的时间步长
        self.time_step = self.model.opt.timestep

        self.success_threshold = 0.001  # 1mm阈值
        
        # 重置步数计数器
        self.step_count = 0
        
        # 重置目标可视化标记
        if hasattr(self, '_target_viz_added'):
            self._target_viz_added = False
        
        # 清理之前添加的可视化几何体
        if self.viewer is not None:
            # 重置场景中的几何体数量为0， effectively removing all added geometries
            self.viewer.user_scn.ngeom = 0
        
        # 获取初始状态
        state = self._get_state()
        return state
    
    def _get_state(self):
        # 获取关节角度
        joint_angles = self.data.qpos[:self.nu].copy()
        
        # 获取关节速度
        joint_velocities = self.data.qvel[:self.nu].copy()
        
        # 获取关节扭矩
        joint_torques = self.data.qfrc_actuator[:self.nu].copy()
        
        # 获取末端执行器位置（通过名称查找）
        ee_site_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SITE, "ee_site")
        
        # 获取末端执行器位置
        ee_pos = self.data.site_xpos[ee_site_id].copy()
        
        # 使用body速度获取末端执行器速度
        ee_body_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_BODY, "ee_link")
        ee_vel = self.data.cvel[ee_body_id][:3].copy()
        
        # 拼接状态向量
        state = np.concatenate([
            joint_angles,
            joint_velocities,
            joint_torques,
            ee_pos,
            ee_vel,
            self.target_pos
        ])
        
        # 如果启用状态归一化，则对状态进行归一化
        if self.normalize_states:
            state = self.state_rms.normalize(state)
        
        return state
    
    def step(self, action):
        # 应用动作（扭矩）
        action = np.clip(action, self.action_low, self.action_high)
        self.data.ctrl[:self.nu] = action
        
        # 仿真一步
        mujoco.mj_step(self.model, self.data)
        
        # 更新步数计数器
        self.step_count += 1
        
        # 获取新状态
        state = self._get_state()
        
        # 计算奖励
        ee_pos = self.data.site_xpos[0].copy()
        distance = np.linalg.norm(ee_pos - self.target_pos)
        
        # 重新设计的奖励函数 - 包含密度奖励和分布奖励
        
        # 1. 密度奖励 - 鼓励持续向目标靠近
        # 基础距离奖励，使用指数函数使接近目标时奖励增长更快
        density_reward = 1.2 * (1 - np.exp(-2.0 * (0.6 - distance)))
        
        # 添加分段的密集距离奖励，鼓励精确控制
        if distance < 0.001:
            # 当距离非常近时(1mm内)，给予最大奖励以鼓励精确控制
            density_reward += (0.001 - distance) * 300.0
        elif distance < 0.005:
            # 当距离很近时(5mm内)，给予较大奖励
            density_reward += (0.005 - distance) * 100.0
        elif distance < 0.01:
            # 当距离较近时(1cm内)，给予适度奖励
            density_reward += (0.01 - distance) * 50.0
        elif distance < 0.05:
            # 当距离中等时(5cm内)，给予适度奖励
            density_reward += (0.05 - distance) * 10.0
        elif distance < 0.1:
            # 当距离稍远时(10cm内)，给予较小奖励
            density_reward += (0.1 - distance) * 2.0
        elif distance < 0.3:
            # 当距离较远时(30cm内)，给予最小奖励
            density_reward += (0.3 - distance) * 0.5
            
        # 2. 分布奖励 - 鼓励在整个轨迹中保持良好表现
        distribution_reward = 0.0
        
        # 速度分布奖励 - 鼓励在整个过程中保持适中的速度
        ee_body_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_BODY, "ee_link")
        ee_vel = self.data.cvel[ee_body_id][:3].copy()
        ee_speed = np.linalg.norm(ee_vel)
        
        # 鼓励保持适中的末端执行器速度 (0.01m/s - 0.1m/s)
        if 0.01 <= ee_speed <= 0.1:
            distribution_reward += 0.5
        elif 0.005 <= ee_speed < 0.01:
            distribution_reward += 0.3
        elif 0.1 < ee_speed <= 0.2:
            distribution_reward += 0.2
        elif ee_speed < 0.005:
            # 几乎静止时给予较小奖励，鼓励继续移动
            distribution_reward += 0.1
            
        # 关节速度分布奖励 - 鼓励各关节速度分布均匀，避免某些关节过快
        joint_velocities = self.data.qvel[:self.nu].copy()
        joint_speeds = np.abs(joint_velocities)
        avg_joint_speed = np.mean(joint_speeds)
        
        # 如果各关节速度相对均匀（标准差小），给予奖励
        if len(joint_speeds) > 1:
            speed_std = np.std(joint_speeds)
            if speed_std < avg_joint_speed * 0.5:  # 相对标准差小于50%
                distribution_reward += 0.3
            elif speed_std < avg_joint_speed:  # 相对标准差小于100%
                distribution_reward += 0.1
                
        # 关节加速度分布奖励 - 鼓励加速度变化平滑，减少抖动
        if not hasattr(self, 'prev_joint_velocities'):
            self.prev_joint_velocities = np.zeros(self.nu)
            
        joint_accelerations = (joint_velocities - self.prev_joint_velocities) / self.model.opt.timestep
        self.prev_joint_velocities = joint_velocities.copy()
        
        # 惩罚过大的关节加速度（减少抖动）
        joint_acc_magnitude = np.linalg.norm(joint_accelerations)
        if joint_acc_magnitude < 10.0:  # 低加速度给予奖励
            distribution_reward += 0.2
        elif joint_acc_magnitude < 20.0:  # 中等加速度给予较小奖励
            distribution_reward += 0.1
            
        # 3. 平稳性奖励 - 鼓励运动平稳、减少抖动
        smoothness_reward = 0.0
        
        # 动作变化惩罚 - 鼓励动作变化平滑
        if not hasattr(self, 'prev_action'):
            self.prev_action = np.zeros(self.nu)
            
        action_change = np.linalg.norm(action - self.prev_action)
        self.prev_action = action.copy()
        
        # 惩罚过大的动作变化
        if action_change < 2.0:  # 小动作变化给予奖励
            smoothness_reward += 0.3
        elif action_change < 5.0:  # 中等动作变化给予较小奖励
            smoothness_reward += 0.1
            
        # 关节速度惩罚 - 惩罚过高的关节速度，鼓励平稳运动
        joint_speed_penalty = -0.005 * np.sum(np.square(joint_velocities))
        
        # 4. 其他奖励和惩罚项
        # 动作惩罚（鼓励使用较小的力）
        action_penalty = -0.001 * np.sum(np.square(action))
        
        # 碰撞惩罚 - 检查是否有接触
        collision_penalty = 0.0
        collision_detected = False
        for i in range(self.data.ncon):
            contact = self.data.contact[i]
            # 如果检测到碰撞，给予惩罚
            collision_penalty -= 100.0
            collision_detected = True
        
        # 高度惩罚 - 防止机械臂触碰地面
        height_penalty = 0.0
        if ee_pos[2] < 0.05:  # 如果末端执行器高度低于5cm
            height_penalty = -5.0 * (0.05 - ee_pos[2])
        
        # 垂直方向奖励 - 鼓励机械臂保持在合理高度
        vertical_reward = 0.0
        if 0.05 <= ee_pos[2] <= 0.4:  # 如果末端执行器在合理高度范围内
            vertical_reward = 0.1
            
        # 仿真步数惩罚 - 鼓励尽快完成任务
        step_penalty = -0.001 * self.step_count
        
        # 计算总奖励
        reward = (density_reward + 
                 distribution_reward + 
                 smoothness_reward + 
                 joint_speed_penalty + 
                 action_penalty + 
                 collision_penalty + 
                 height_penalty + 
                 vertical_reward + 
                 step_penalty)
        
        # 检查是否完成
        distance_to_target = np.linalg.norm(ee_pos - self.target_pos)
        
        done = False
        success = False
        # 新的成功条件：只要碰到目标位置就算成功（距离小于1mm）
        if distance_to_target < self.success_threshold:
            done = True
            success = True
            
            # 成功奖励
            success_reward = 200.0
            
            # 速度奖励：成功时速度越慢奖励越高
            if ee_speed < 0.01:
                speed_reward_on_success = 100.0
            elif ee_speed < 0.05:
                speed_reward_on_success = 50.0
            elif ee_speed < 0.1:
                speed_reward_on_success = 20.0
            else:
                speed_reward_on_success = 0.0
                
            reward += success_reward + speed_reward_on_success
        
        # 如果发生碰撞，也结束episode
        if collision_detected:
            done = True
        
        # 返回奖励成分信息，用于分析和可视化
        reward_info = {
            'density_reward': density_reward,
            'distribution_reward': distribution_reward,
            'smoothness_reward': smoothness_reward,
            'joint_speed_penalty': joint_speed_penalty,
            'action_penalty': action_penalty,
            'collision_penalty': collision_penalty,
            'height_penalty': height_penalty,
            'vertical_reward': vertical_reward,
            'step_penalty': step_penalty
        }
            
        return state, reward, done, {'reward_info': reward_info}
    
    def close(self):
        """
        关闭可视化器
        """
        if self.viewer:
            self.viewer.close()
            self.viewer = None

## 定义Actor和Critic网络

In [None]:
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256, action_high=1.0):
        super(Actor, self).__init__()
        
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()
        )
        
        # 确保action_high是一个序列，如果不是则转换为序列
        if isinstance(action_high, (int, float)):
            self.action_scale = torch.FloatTensor([action_high] * action_dim)
        else:
            self.action_scale = torch.FloatTensor(action_high)
        
    def forward(self, state):
        action = self.actor(state)
        # 确保action_scale与state在同一设备上
        if self.action_scale.device != state.device:
            self.action_scale = self.action_scale.to(state.device)
        action = action * self.action_scale
        return action

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(Critic, self).__init__()
        
        # 第一个Q网络
        self.q1 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
        # 第二个Q网络
        self.q2 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
    def forward(self, state, action):
        sa = torch.cat([state, action], dim=1)
        q1 = self.q1(sa)
        q2 = self.q2(sa)
        return q1, q2
    
    def Q1(self, state, action):
        sa = torch.cat([state, action], dim=1)
        q1 = self.q1(sa)
        return q1

## 定义TD3智能体和经验回放缓冲区

In [None]:
class TD3Agent:
    def __init__(self, state_dim, action_dim, action_high=20.0, 
                 lr_actor=3e-4, lr_critic=3e-4, gamma=0.99, tau=0.005, 
                 policy_noise=0.2, noise_clip=0.5, policy_freq=2, exploration_noise_std=0.1):
        """
        TD3算法智能体初始化
        
        超参数说明:
        - lr_actor (float): Actor网络学习率
          作用: 控制Actor网络参数更新的步长
          调整建议: 通常在[1e-4, 1e-3]范围内，较小值训练稳定但收敛慢，较大值收敛快但可能不稳定
          
        - lr_critic (float): Critic网络学习率
          作用: 控制Critic网络参数更新的步长
          调整建议: 通常与lr_actor相同或稍大，一般在[1e-4, 1e-3]范围内
          
        - gamma (float): 折扣因子
          作用: 衡量未来奖励的重要性，值越接近1表示越重视长期奖励
          调整建议: 通常设置为0.99左右，对于更注重即时奖励的任务可适当降低
          
        - tau (float): 目标网络软更新系数
          作用: 控制目标网络向主网络更新的速度，值越小更新越慢
          调整建议: 通常设置为0.001-0.01，过大会导致训练不稳定，过小会导致收敛缓慢
          
        - policy_noise (float): 策略噪声标准差
          作用: 在目标策略中添加噪声以平滑Q函数，减少过估计
          调整建议: 通常设置为动作空间范围的10-20%，需根据具体动作范围调整
          
        - noise_clip (float): 噪声裁剪范围
          作用: 限制添加到目标策略中的噪声范围，保持目标策略的合理性
          调整建议: 通常是policy_noise的2-3倍，防止噪声过大
          
        - policy_freq (int): 策略更新频率
          作用: 控制Actor网络更新频率，相对于Critic网络更新的倍数
          调整建议: 通常设置为2，表示Critic更新2次后Actor更新1次，增加训练稳定性
          
        - exploration_noise_std (float): 探索噪声标准差
          作用: 在动作选择时添加噪声以促进探索
          调整建议: 根据动作空间范围调整，通常为动作范围的5-15%，过大会导致行动不稳定，过小探索不足
        """
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.action_dim = action_dim
        self.gamma = gamma  # 折扣因子，控制未来奖励的重要性
        self.tau = tau      # 目标网络软更新系数
        self.policy_noise = policy_noise  # 目标策略平滑噪声标准差
        self.noise_clip = noise_clip      # 目标策略平滑噪声裁剪范围
        self.policy_freq = policy_freq    # 策略更新频率（相对于Critic更新）
        self.action_high = action_high    # 动作上限
        self.exploration_noise_std = exploration_noise_std  # 探索噪声标准差
        
        self.total_it = 0
        
        # 创建网络
        self.actor = Actor(state_dim, action_dim, action_high=action_high).to(self.device)
        self.actor_target = copy.deepcopy(self.actor)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)
        
        self.critic = Critic(state_dim, action_dim).to(self.device)
        self.critic_target = copy.deepcopy(self.critic)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)
    
    def select_action(self, state, noise_std=None):
        """
        选择动作
        
        参数:
        - state: 当前状态
        - noise_std (float): 探索噪声标准差
          作用: 控制添加到动作中的噪声强度，用于探索
          调整建议: 训练时使用exploration_noise_std，测试时设为0或很小的值
        """
        # 如果没有指定noise_std，则使用初始化时设置的默认值
        # exploration_noise_std: 探索噪声标准差，控制探索强度
        if noise_std is None:
            noise_std = self.exploration_noise_std
            
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
        action = self.actor(state).cpu().data.numpy().flatten()
        
        # 添加探索噪声
        if noise_std > 0:
            noise = np.random.normal(0, noise_std, size=self.action_dim)
            action = action + noise
        
        # 限制动作范围
        action = np.clip(action, -self.action_high, self.action_high)
        return action
    
    def train(self, replay_buffer, batch_size=256):
        """
        训练智能体
        
        参数:
        - replay_buffer: 经验回放缓冲区
        - batch_size (int): 批处理大小
          作用: 每次训练使用的样本数量
          调整建议: 通常设置为128-512，较大的batch_size训练更稳定但需要更多计算资源
        """
        self.total_it += 1
        
        # 从经验回放缓冲区采样
        state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)
        
        with torch.no_grad():
            # 添加目标策略平滑
            # policy_noise: 控制目标策略平滑的噪声强度
            # noise_clip: 限制噪声范围，防止噪声过大
            noise = torch.randn_like(action) * self.policy_noise
            noise = noise.clamp(-self.noise_clip, self.noise_clip)
            
            next_action = self.actor_target(next_state) + noise
            next_action = next_action.clamp(-self.action_high, self.action_high)
            
            # 计算目标Q值
            # gamma: 折扣因子，平衡即时奖励和未来奖励
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)
            target_Q = torch.min(target_Q1, target_Q2)
            target_Q = reward + not_done * self.gamma * target_Q
        
        # 获取当前Q值
        current_Q1, current_Q2 = self.critic(state, action)
        
        # 计算critic损失
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)
        
        # 优化critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # 延迟策略更新
        # policy_freq: 控制策略更新频率，减少策略更新次数提高稳定性
        if self.total_it % self.policy_freq == 0:
            
            # 计算actor损失
            actor_loss = -self.critic.Q1(state, self.actor(state)).mean()
            
            # 优化actor
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()
            
            # 更新目标网络
            # tau: 目标网络软更新系数，控制更新速度
            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
                
            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
                
        return critic_loss.item()
    
    def save(self, filename, state_rms=None):
        """
        保存模型参数
        """
        save_dict = {
            'actor_state_dict': self.actor.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'actor_target_state_dict': self.actor_target.state_dict(),
            'critic_target_state_dict': self.critic_target.state_dict(),
            'actor_optimizer_state_dict': self.actor_optimizer.state_dict(),
            'critic_optimizer_state_dict': self.critic_optimizer.state_dict(),
            'total_it': self.total_it
        }
        
        # 如果提供了状态归一化参数，则一并保存
        if state_rms is not None:
            save_dict['state_rms_mean'] = state_rms.mean
            save_dict['state_rms_std'] = state_rms.std
            save_dict['state_rms_n'] = state_rms.n
            save_dict['state_rms_S'] = state_rms.S
        
        torch.save(save_dict, filename)
        print(f"模型已保存至 {filename}")
    
    def load(self, filename, env=None):
        """
        加载模型参数
        """
        checkpoint = torch.load(filename, map_location=self.device)
        self.actor.load_state_dict(checkpoint['actor_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        self.actor_target.load_state_dict(checkpoint['actor_target_state_dict'])
        self.critic_target.load_state_dict(checkpoint['critic_target_state_dict'])
        self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
        self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])
        self.total_it = checkpoint['total_it']
        
        # 如果环境对象和状态归一化参数都存在，则恢复状态归一化参数
        if env is not None and hasattr(env, 'state_rms') and 'state_rms_mean' in checkpoint:
            env.state_rms.mean = checkpoint['state_rms_mean']
            env.state_rms.std = checkpoint['state_rms_std']
            env.state_rms.n = checkpoint['state_rms_n']
            env.state_rms.S = checkpoint['state_rms_S']
            print("状态归一化参数已恢复")
        
        print(f"模型已从 {filename} 加载")

class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size=int(1e6)):
        """
        经验回放缓冲区
        
        超参数说明:
        - max_size (int): 经验回放缓冲区最大容量
          作用: 控制存储经验的数量，影响学习效率和内存占用
          调整建议: 通常设置为1e6左右，较大的缓冲区可以存储更多样化的经验，但占用更多内存
        """
        self.max_size = max_size
        self.ptr = 0
        self.size = 0
        
        self.state = np.zeros((max_size, state_dim))
        self.action = np.zeros((max_size, action_dim))
        self.next_state = np.zeros((max_size, state_dim))
        self.reward = np.zeros((max_size, 1))
        self.not_done = np.zeros((max_size, 1))
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
    def add(self, state, action, next_state, reward, done):
        """
        向经验回放缓冲区添加经验
        """
        self.state[self.ptr] = state
        self.action[self.ptr] = action
        self.next_state[self.ptr] = next_state
        self.reward[self.ptr] = reward
        self.not_done[self.ptr] = 1. - done
        
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)
        
    def sample(self, batch_size):
        """
        从经验回放缓冲区中采样一批经验
        
        参数:
        - batch_size (int): 采样批次大小
          作用: 控制每次训练使用的样本数量
          调整建议: 通常设置为128-512，需要在训练稳定性和计算效率之间平衡
        """
        ind = np.random.randint(0, self.size, size=batch_size)
        
        return (
            torch.FloatTensor(self.state[ind]).to(self.device),
            torch.FloatTensor(self.action[ind]).to(self.device),
            torch.FloatTensor(self.next_state[ind]).to(self.device),
            torch.FloatTensor(self.reward[ind]).to(self.device),
            torch.FloatTensor(self.not_done[ind]).to(self.device)
        )

## 定义训练和测试函数

In [None]:
def train_agent(env, agent, replay_buffer, num_episodes=1000, max_steps=5000, start_timesteps=50000, 
                save_freq=50, model_path="td3_robot_arm.pth", render=False, normalize_states=True):
    """
    训练智能体
    
    超参数说明:
    - num_episodes (int): 训练episode总数
      作用: 控制训练的总时长
      调整建议: 根据任务复杂度调整，简单任务可设置较小值，复杂任务需要更多episode
      
    - max_steps (int): 每个episode的最大步数
      作用: 防止episode无限进行下去
      调整建议: 根据任务特点设置，确保智能体有足够时间完成任务
      
    - start_timesteps (int): 开始学习前的随机探索步数
      作用: 收集初始经验，填充经验回放缓冲区
      调整建议: 通常设置为几千到几万步，确保缓冲区有足够多样化的经验
      
    - save_freq (int): 模型保存频率
      作用: 控制模型保存间隔，防止训练中断丢失进度
      调整建议: 根据训练时长设置，通常每隔几十到几百个episode保存一次
    
    - normalize_states (bool): 是否对状态进行归一化
      作用: 控制是否启用状态归一化功能
      调整建议: 默认启用，有助于提高训练稳定性
    """
    # 如果启用状态归一化，先收集状态统计数据
    if normalize_states:
        print("收集状态统计数据用于归一化...")
        random_states = collect_random_states(env, num_episodes=2000)
        env.state_rms.update(random_states)
        print(f"状态均值: {env.state_rms.mean}")
        print(f"状态标准差: {env.state_rms.std}")
    
    # 记录训练过程
    episode_rewards = []
    episode_lengths = []
    episode_times = []
    
    # 记录奖励成分的变化
    reward_components_history = {
        'density_reward': [],
        'distribution_reward': [],
        'smoothness_reward': [],
        'joint_speed_penalty': [],
        'action_penalty': [],
        'collision_penalty': [],
        'height_penalty': [],
        'vertical_reward': [],
        'step_penalty': []
    }
    
    # 用于存储最近100个episode的奖励
    recent_rewards = deque(maxlen=100)
    
    total_timesteps = 0
    
    # 创建模型保存目录
    os.makedirs(os.path.dirname(model_path) if os.path.dirname(model_path) else '.', exist_ok=True)
    start_time = time.time()
    
    for episode in range(num_episodes):
        # 重置环境
        state = env.reset()
        
        episode_reward = 0
        
        # 记录episode开始时间
        episode_start_time = time.time()
        
        # 为当前episode初始化奖励成分记录
        episode_reward_components = {key: 0 for key in reward_components_history.keys()}
        
        for step in range(max_steps):
            # 选择动作
            if total_timesteps < start_timesteps:
                # 在开始阶段使用随机动作进行探索
                action = np.random.uniform(-env.action_high, env.action_high, size=env.nu)
            else:
                action = agent.select_action(state)
            
            # 执行动作
            next_state, reward, done, info = env.step(action)
            
            # 可视化训练过程
            if render:
                env.render()
                time.sleep(0.01)  # 控制渲染速度
            
            # 存储经验
            replay_buffer.add(state, action, next_state, reward, done)
            
            # 训练智能体
            if total_timesteps >= start_timesteps:
                loss = agent.train(replay_buffer)
            
            episode_reward += reward
            state = next_state
            total_timesteps += 1
            
            # 累加奖励成分
            if 'reward_info' in info:
                for key in episode_reward_components.keys():
                    if key in info['reward_info']:
                        episode_reward_components[key] += info['reward_info'][key]
            
            if done:
                break
        
        # 记录统计信息
        episode_rewards.append(episode_reward)
        episode_lengths.append(step + 1)
        recent_rewards.append(episode_reward)
        
        # 记录episode的奖励成分
        for key in reward_components_history.keys():
            reward_components_history[key].append(episode_reward_components[key])
        
        # 计算episode耗时
        episode_time = time.time() - episode_start_time
        episode_times.append(episode_time)
        
        # 打印进度
        if episode % 10 == 0:
            avg_reward = np.mean(recent_rewards)
            avg_step = np.mean(episode_lengths)
            avg_episode_time = np.mean(episode_times)
            print(f"Episode {episode}, 平均奖励: {avg_reward:.2f}, 平均Episode长度: {avg_step:.2f}, 平均耗时: {avg_episode_time:.2f}秒, 总步数: {total_timesteps}, 运行时间: {time.time() - start_time:.2f}秒")
            recent_rewards.clear()
            episode_lengths.clear()
            episode_times.clear()

        # 定期保存模型，同时保存状态归一化参数
        if episode % save_freq == 0:
            if normalize_states:
                agent.save(f"{model_path}_episode_{episode}.pth", state_rms=env.state_rms)
            else:
                agent.save(f"{model_path}_episode_{episode}.pth")
    
    # 保存最终模型
    if normalize_states:
        agent.save(f"{model_path}_final.pth", state_rms=env.state_rms)
    else:
        agent.save(f"{model_path}_final.pth")
    
    return episode_rewards, episode_lengths, reward_components_history

def resume_training(env, agent, replay_buffer, resume_episode, num_episodes=1000, max_steps=500, 
                    start_timesteps=10000, save_freq=50, model_path="td3_robot_arm.pth", render=False):
    """
    从指定episode继续训练
    """
    # 加载模型
    model_file = f"{model_path}_episode_{resume_episode}.pth"
    if os.path.exists(model_file):
        agent.load(model_file, env=env)
        print(f"从episode {resume_episode} 继续训练")
    else:
        print(f"未找到模型文件 {model_file}，从头开始训练")
        resume_episode = 0
    
    # 调用常规训练函数
    return train_agent(env, agent, replay_buffer, num_episodes, max_steps, start_timesteps, save_freq, model_path, render)

def collect_random_states(env, num_episodes=100):
    """
    使用随机策略收集状态数据，用于计算状态归一化参数
    
    参数:
    - env: 环境实例
    - num_episodes: 收集数据的回合数
    
    返回:
    - 所有收集到的状态数据
    """
    all_states = []
    
    for episode in range(num_episodes):
        state = env.reset()
        all_states.append(state)
        
        # 使用随机动作探索环境
        done = False
        while not done:
            action = np.random.uniform(-env.action_high, env.action_high, size=env.nu)
            state, _, done, _ = env.step(action)
            all_states.append(state)
            
    return np.array(all_states)

# 测试训练好的智能体
def test_agent(env, agent, num_episodes=10, render=True):
    # 记录每个episode的奖励成分
    all_reward_components = []
    
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        
        # 初始化当前episode的奖励成分记录
        episode_reward_components = {
            'density_reward': 0,
            'distribution_reward': 0,
            'smoothness_reward': 0,
            'joint_speed_penalty': 0,
            'action_penalty': 0,
            'collision_penalty': 0,
            'height_penalty': 0,
            'vertical_reward': 0,
            'step_penalty': 0
        }
        
        for step in range(200):
            action = agent.select_action(state, noise_std=0)  # 测试时不添加噪声
            state, reward, done, info = env.step(action)
            if render:
                env.render()  # 在测试时进行可视化
                time.sleep(0.01)  # 控制渲染速度
            total_reward += reward
            
            # 累加奖励成分
            if 'reward_info' in info:
                for key in episode_reward_components.keys():
                    if key in info['reward_info']:
                        episode_reward_components[key] += info['reward_info'][key]
            
            if done:
                break
        
        print(f"测试Episode {episode + 1}: 总奖励 = {total_reward:.2f}, 步数 = {step + 1}")
        
        # 保存当前episode的奖励成分
        all_reward_components.append(episode_reward_components)
    
    # 关闭可视化器
    env.close()
    
    # 绘制奖励成分曲线
    if all_reward_components:
        # 准备数据
        components_keys = list(all_reward_components[0].keys())
        components_data = {key: [ep[key] for ep in all_reward_components] for key in components_keys}
        
        # 绘制各种奖励成分（分别绘制）
        components_to_plot = [
            ('density_reward', 'Density Reward'),
            ('distribution_reward', 'Distribution Reward'),
            ('smoothness_reward', 'Smoothness Reward'),
            ('joint_speed_penalty', 'Joint Speed Penalty'),
            ('action_penalty', 'Action Penalty'),
            ('collision_penalty', 'Collision Penalty')
        ]
        
        # 为每个奖励成分创建单独的图表
        for comp_key, comp_name in components_to_plot:
            plt.figure(figsize=(8, 6))
            plt.plot(components_data[comp_key], marker='o')
            plt.title(f'{comp_name} over Episodes')
            plt.xlabel('Episode')
            plt.ylabel(comp_name)
            plt.grid(True)
            plt.tight_layout()
            plt.show()
        
        # 绘制奖励成分堆叠图
        plt.figure(figsize=(10, 6))
        
        # 准备堆叠数据
        stack_data = [components_data[key] for key, _ in components_to_plot]
        stack_labels = [name for _, name in components_to_plot]
        
        # 绘制堆叠区域图
        plt.stackplot(range(len(all_reward_components)), stack_data, labels=stack_labels)
        plt.title('Reward Components Composition')
        plt.xlabel('Episode')
        plt.ylabel('Reward Value')
        plt.legend(loc='upper left')
        plt.tight_layout()
        plt.show()

## 创建环境

In [None]:
# 创建环境和智能体
env = RobotArmEnv(normalize_states=True)

# 状态维度: 关节角度(6) + 关节速度(6) + 关节扭矩(6) + 末端位置(3) + 末端速度(3) + 目标位置(3)
state_dim = 6 + 6 + 6 + 3 + 3 + 3  # 27维状态
action_dim = env.nu  # 6维动作（扭矩）

print(f"状态维度: {state_dim}")
print(f"动作维度: {action_dim}")

# 超参数设置
# 学习率设置
lr_actor=3e-4          # Actor网络学习率，控制策略网络参数更新步长
                       # 值域范围：[1e-4, 1e-3]，较小值训练稳定但收敛慢，较大值收敛快但可能不稳定
                       # 推荐值：3e-4是深度强化学习中的常用值，平衡训练稳定性和收敛速度
lr_critic=3e-4         # Critic网络学习率，控制价值网络参数更新步长
                       # 值域范围：[1e-4, 1e-3]，通常与lr_actor相同或稍大
                       # 推荐值：3e-4，与Actor网络保持一致以维持训练平衡

# 环境和奖励相关参数
gamma=0.99             # 折扣因子，衡量未来奖励的重要性
                       # 值域范围：[0.9, 0.999]，值越接近1表示越重视长期奖励
                       # 推荐值：0.99，适用于大多数连续控制任务

# 目标网络更新参数
tau=0.005              # 目标网络软更新系数，控制目标网络向主网络更新的速度
                       # 值域范围：[0.001, 0.05]，过大会导致训练不稳定，过小会导致收敛缓慢
                       # 推荐值：0.005，在稳定性和收敛速度之间良好平衡

# 策略噪声参数
policy_noise=0.2       # 策略噪声标准差，在目标策略中添加噪声以平滑Q函数，减少过估计
                       # 值域范围：通常设置为动作空间范围的10-20%
                       # 推荐值：0.2，假设动作范围为[-1, 1]，需要根据具体动作范围调整
noise_clip=1.0         # 噪声裁剪范围，限制添加到目标策略中的噪声范围，保持目标策略的合理性
                       # 值域范围：通常是policy_noise的2-5倍
                       # 推荐值：1.0，约为policy_noise的5倍

# 策略更新频率
policy_freq=2          # 策略更新频率，控制Actor网络更新频率，相对于Critic网络更新的倍数
                       # 值域范围：[1, 10]，值越大更新越慢
                       # 推荐值：2，表示Critic更新2次后Actor更新1次，增加训练稳定性

# 探索噪声参数
exploration_noise_std=0.12  # 探索噪声标准差，在动作选择时添加噪声以促进探索
                           # 值域范围：根据动作空间范围调整，通常为动作范围的5-15%
                           # 推荐值：0.1，对于机械臂控制任务提供良好探索与稳定性的平衡
                           # 调整建议：如果策略收敛慢或陷入局部最优可适当增加到0.15-0.2
                           #          如果动作抖动过大或无法精确控制可降低到0.05-0.1

# 训练控制参数
num_episodes = 20000   # 训练episode数，控制训练的总时长
                       # 值域范围：根据任务复杂度调整，简单任务可设置较小值，复杂任务需要更多episode
                       # 推荐值：1000，适用于中等复杂度的机械臂控制任务
save_freq = 100        # 模型保存频率，控制模型保存间隔，防止训练中断丢失进度
                       # 值域范围：根据训练时长设置，通常每隔几十到几百个episode保存一次
                       # 推荐值：100，平衡了模型保存开销和进度保护需求

TEST_MODE = True
test_model_path = "models/td3_robot_arm_episode_800.pth"

## 演示如何加载已保存的模型

In [None]:
if TEST_MODE:
    print("演示模型加载:")
    # 创建一个新的智能体实例
    new_agent = TD3Agent(state_dim, action_dim, action_high=env.action_high)

    # 加载保存的模型（这里加载最终模型，实际使用时可以加载任何保存的模型）
    new_agent.load(test_model_path, env=env)
    print(f"模型加载成功:{test_model_path}")

    print("测试加载的模型:")
    # 测试时默认开启可视化，可以通过设置render=False关闭可视化
    test_agent(env, new_agent, render=True)

    # 关闭环境的可视化器
    env.close()

## 创建环境和智能体并开始训练

In [None]:
if not TEST_MODE:
    # 创建TD3智能体
    agent = TD3Agent(state_dim, action_dim, action_high=env.action_high, 
                    lr_actor=lr_actor, lr_critic=lr_critic, gamma=gamma, tau=tau, 
                    policy_noise=policy_noise, noise_clip=noise_clip, 
                    policy_freq=policy_freq, exploration_noise_std=exploration_noise_std)
    replay_buffer = ReplayBuffer(state_dim, action_dim)
    # 开始训练
    print("开始训练TD3智能体...")
    start_time = time.time()

    # 可以通过设置render=True来可视化训练过程，设置render=False则不进行可视化
    episode_rewards, episode_lengths, reward_components_history = train_agent(env, agent, replay_buffer, num_episodes=num_episodes, model_path="models/td3_robot_arm", render=False, save_freq=save_freq, normalize_states=True)

    end_time = time.time()
    print(f"训练完成，耗时: {end_time - start_time:.2f}秒")

## 绘制训练结果

In [None]:
if not TEST_MODE:
    # 绘制训练结果
    plt.figure(figsize=(15, 10))

    plt.subplot(2, 3, 1)
    plt.plot(episode_rewards)
    plt.title('Episode Rewards')
    plt.xlabel('Episode')
    plt.ylabel('Rewards')

    plt.subplot(2, 3, 2)
    plt.plot(episode_lengths)
    plt.title('Episode Steps')
    plt.xlabel('Episode')
    plt.ylabel('Steps')

    plt.tight_layout()
    plt.show()

    # 绘制各种奖励成分（分别绘制）
    components_to_plot = [
        ('density_reward', 'Density Reward'),
        ('distribution_reward', 'Distribution Reward'),
        ('smoothness_reward', 'Smoothness Reward'),
        ('joint_speed_penalty', 'Joint Speed Penalty'),
        ('action_penalty', 'Action Penalty'),
        ('collision_penalty', 'Collision Penalty')
    ]

    # 为每个奖励成分创建单独的图表
    for comp_key, comp_name in components_to_plot:
        plt.figure(figsize=(8, 6))
        plt.plot(reward_components_history[comp_key])
        plt.title(f'{comp_name} over Episodes')
        plt.xlabel('Episode')
        plt.ylabel(comp_name)
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    # 绘制奖励成分堆叠图
    plt.figure(figsize=(12, 6))

    # 准备堆叠数据
    components_data = [reward_components_history[key] for key in [
        'density_reward', 'distribution_reward', 'smoothness_reward',
        'joint_speed_penalty', 'action_penalty', 'collision_penalty'
    ]]

    components_labels = [
        'Density Reward', 'Distribution Reward', 'Smoothness Reward',
        'Joint Speed Penalty', 'Action Penalty', 'Collision Penalty'
    ]

    # 绘制堆叠区域图
    plt.stackplot(range(len(episode_rewards)), components_data, labels=components_labels)
    plt.title('Reward Components Composition')
    plt.xlabel('Episode')
    plt.ylabel('Reward Value')
    plt.legend(loc='upper left')
    plt.tight_layout()
    plt.show()

## 测试训练好的智能体


In [None]:
if not TEST_MODE:
    print("测试训练好的智能体:")
    # 测试时默认开启可视化，可以通过设置render=False关闭可视化
    test_agent(env, agent, render=True)