# 部署训练好的策略

<img src="./media/rollout.gif" width="480" height="360">

**中文教程：如何在仿真环境中部署训练好的机器人策略**

本教程将指导您如何加载训练好的ACT（Action Chunking with Transformers）策略，并在MuJoCo仿真环境中进行部署测试。

## 学习目标
- 理解策略部署的基本流程
- 学会加载预训练模型
- 掌握环境与策略的交互方式
- 能够运行完整的部署流程

In [7]:
# Cell 1 - 设置环境变量（必须第一个运行）
import os

# 1. 设置DISPLAY - 用于图形显示
# 在Linux系统中，DISPLAY环境变量告诉程序在哪里显示图形界面
os.environ['DISPLAY'] = ':0'
os.environ['XAUTHORITY'] = os.path.expanduser('~/.Xauthority')
print(f"✓ DISPLAY设置为: {os.environ['DISPLAY']}")

# 2. 强制使用GPU渲染（关键！）
# MUJOCO_GL='egl' 使用EGL后端进行GPU硬件加速渲染
# 这比CPU渲染快很多，特别是对于复杂的3D场景
os.environ['MUJOCO_GL'] = 'egl'  # EGL后端GPU加速
print(f"✓ MUJOCO_GL: egl (GPU硬件加速)")

# 3. NVIDIA GPU优化
# 关闭垂直同步可以避免帧率限制，提高渲染性能
os.environ['__GL_SYNC_TO_VBLANK'] = '0'  # 关闭垂直同步
os.environ['__GL_YIELD'] = 'NOTHING'      # 减少CPU等待
print("✓ NVIDIA GPU优化已启用")

# 4. OpenGL性能优化
# 关闭抗锯齿和各向异性过滤可以显著提高渲染速度
os.environ['__GL_FSAA_MODE'] = '0'        # 关闭抗锯齿
os.environ['__GL_LOG_MAX_ANISO'] = '0'    # 关闭各向异性过滤
print("✓ OpenGL性能优化已启用")

✓ DISPLAY设置为: :0
✓ MUJOCO_GL: egl (GPU硬件加速)
✓ NVIDIA GPU优化已启用
✓ OpenGL性能优化已启用


In [8]:
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
import numpy as np
from lerobot.common.datasets.utils import write_json, serialize_dict
from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
from lerobot.configs.types import FeatureType
from lerobot.common.datasets.factory import resolve_delta_timestamps
from lerobot.common.datasets.utils import dataset_to_policy_features
import torch
from PIL import Image
import torchvision

## 加载策略

In [9]:
device = 'cuda'  # 指定使用GPU进行计算，如果GPU不可用可以改为'cpu'

In [10]:
# 1. 加载数据集元数据 - 包含了数据集的统计信息和特征描述
dataset_metadata = LeRobotDatasetMetadata("omy_pnp", root='./demo_data')  # 从demo_data目录加载omy_pnp数据集的元数据

# 2. 特征处理 - 将数据集特征转换为策略可用的格式
features = dataset_to_policy_features(dataset_metadata.features)  # 将数据集特征转换为策略可用的格式

# 3. 区分输入和输出特征
# 输出特征是机器人的动作（如关节角度）
output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION}
# 输入特征是观察值（如图像、机器人状态等）
input_features = {key: ft for key, ft in features.items() if key not in output_features}
# 移除手腕相机图像特征（本例中不使用）
input_features.pop("observation.wrist_image")

# 4. 创建ACT策略配置
# ACTConfig是Action Chunking with Transformers的配置类
# chunk_size=10: 每次预测10个时间步的动作
# n_action_steps=1: 每次执行1个动作步骤
# temporal_ensemble_coeff=0.9: 时间集成系数，使动作预测更平滑
cfg = ACTConfig(input_features=input_features, output_features=output_features, chunk_size= 10, n_action_steps=1, temporal_ensemble_coeff = 0.9)

# 5. 处理时间戳信息
delta_timestamps = resolve_delta_timestamps(cfg, dataset_metadata)

# 6. 实例化策略模型并加载预训练权重
# from_pretrained方法从本地目录加载预训练模型
policy = ACTPolicy.from_pretrained('./ckpt/act_y', config = cfg, dataset_stats=dataset_metadata.stats)  # 从本地目录加载预训练模型

# 7. 将策略模型移动到指定设备（GPU）
policy.to(device)  # 将模型移到GPU上以加速计算




Loading weights from local directory


ACTPolicy(
  (normalize_inputs): Normalize(
    (buffer_observation_image): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 3x1x1 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 3x1x1 (cuda:0)]
    )
    (buffer_observation_state): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 6 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 6 (cuda:0)]
    )
  )
  (normalize_targets): Normalize(
    (buffer_action): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
    )
  )
  (unnormalize_outputs): Unnormalize(
    (buffer_action): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
    )
  )
  (model): ACT(
    (v

## Load Environment

In [11]:
# 1. 导入MuJoCo环境模块
from mujoco_env.y_env import SimpleEnv  # 导入简化的MuJoCo环境接口

# 2. 指定场景XML文件路径
xml_path = './asset/example_scene_y.xml'  # 场景描述文件，定义了机器人、物体和环境的物理属性

# 3. 创建抓取放置环境实例
# action_type='joint_angle'表示控制方式为关节角度控制（而非末端执行器位置控制）
PnPEnv = SimpleEnv(xml_path, action_type='joint_angle')  # 创建Pick and Place环境



-----------------------------------------------------------------------------
name:[Tabletop] dt:[0.002] HZ:[500]
 n_qpos:[24] n_qvel:[22] n_qacc:[22] n_ctrl:[10]
 integrator:[IMPLICITFAST]

n_body:[21]
 [0/21] [world] mass:[0.00]kg
 [1/21] [front_object_table] mass:[1.00]kg
 [2/21] [camera] mass:[0.00]kg
 [3/21] [camera2] mass:[0.00]kg
 [4/21] [camera3] mass:[0.00]kg
 [5/21] [link1] mass:[2.06]kg
 [6/21] [link2] mass:[3.68]kg
 [7/21] [link3] mass:[2.39]kg
 [8/21] [link4] mass:[1.40]kg
 [9/21] [link5] mass:[1.40]kg
 [10/21] [link6] mass:[0.65]kg
 [11/21] [camera_center] mass:[0.00]kg
 [12/21] [tcp_link] mass:[0.32]kg
 [13/21] [rh_p12_rn_r1] mass:[0.07]kg
 [14/21] [rh_p12_rn_r2] mass:[0.02]kg
 [15/21] [rh_p12_rn_l1] mass:[0.07]kg
 [16/21] [rh_p12_rn_l2] mass:[0.02]kg
 [17/21] [body_obj_mug_5] mass:[0.00]kg
 [18/21] [object_mug_5] mass:[0.08]kg
 [19/21] [body_obj_plate_11] mass:[0.00]kg
 [20/21] [object_plate_11] mass:[0.10]kg
body_total_mass:[13.27]kg

n_geom:[83]
geom_names:['floor', 

## Roll-Out Your Policy

In [12]:
# 1. 初始化环境和策略
step = 0  # 初始化步数计数器
PnPEnv.reset(seed=0)  # 重置环境，设置随机种子为0以确保可重复性
policy.reset()  # 重置策略内部状态
policy.eval()  # 将策略设置为评估模式（不进行梯度计算）

# 2. 设置图像处理参数
save_image = True  # 是否保存图像的标志
img_transform = torchvision.transforms.ToTensor()  # 图像转换器，将PIL图像转换为PyTorch张量

# 3. 主循环：只要MuJoCo查看器窗口保持打开状态就持续运行
while PnPEnv.env.is_viewer_alive():
    # 3.1 推进物理仿真一步
    PnPEnv.step_env()
    
    # 3.2 以20Hz的频率执行控制逻辑（每秒20次）
    if PnPEnv.env.loop_every(HZ=20):
        # 3.3 检查任务是否完成
        success = PnPEnv.check_success()
        if success:
            print('Success')  # 打印成功信息
            # 重置环境和策略，准备下一轮任务
            policy.reset()
            PnPEnv.reset(seed=0)
            step = 0
            save_image = False
            
        # 3.4 获取当前环境状态
        state = PnPEnv.get_ee_pose()  # 获取机器人末端执行器的位姿
        
        # 3.5 获取当前环境图像
        image, wirst_image = PnPEnv.grab_image()  # 获取第三人称视角和手腕相机图像
        # 处理第三人称视角图像
        image = Image.fromarray(image)  # 将NumPy数组转换为PIL图像
        image = image.resize((256, 256))  # 调整图像大小为256x256
        image = img_transform(image)  # 转换为PyTorch张量
        # 处理手腕相机图像
        wrist_image = Image.fromarray(wirst_image)
        wrist_image = wrist_image.resize((256, 256))
        wrist_image = img_transform(wrist_image)
        
        # 3.6 构建输入数据字典
        data = {
            'observation.state': torch.tensor([state]).to(device),  # 机器人状态
            'observation.image': image.unsqueeze(0).to(device),  # 第三人称视角图像
            'observation.wrist_image': wrist_image.unsqueeze(0).to(device),  # 手腕相机图像
            'task': ['Put mug cup on the plate'],  # 任务描述
            'timestamp': torch.tensor([step/20]).to(device)  # 时间戳（秒）
        }
        
        # 3.7 使用策略选择动作
        action = policy.select_action(data)  # 根据当前观察选择动作
        action = action[0].cpu().detach().numpy()  # 将动作张量转换为NumPy数组
        
        # 3.8 在环境中执行动作
        _ = PnPEnv.step(action)  # 执行选定的动作
        PnPEnv.render()  # 渲染环境
        
        # 3.9 更新步数计数器并检查任务完成情况
        step += 1
        success = PnPEnv.check_success()
        if success:
            print('Success')  # 打印成功信息
            break  # 任务完成，退出循环


DONE INITIALIZATION


KeyboardInterrupt: 

In [13]:
PnPEnv.env.close_viewer()