### 通过修改 Mocap 来控制 Franka Panda 机器人

* 环境

| Package   | Version |
|-----------|---------|
| gymnasium | 0.29.1  |
| numpy | 1.24.0 |
| scipy | 1.13.1 |
| torch |  2.3.1+cu121 |
| grpcio |   1.64.1 |
| 关卡名 | Panda_Mocap |

* Mocap 与机器人手部 ee_center_site 焊接
* 根据强化学习模型输出的 Mocap xpos 修改 ee_site 的 xpos
* 根据逆向动力学，手部关节跟随移动

* 首先使用 reach 任务训练模型，确保机器人学会移动到目标附近


In [None]:
import os
import sys

current_file_path = os.path.abspath('')
project_root = os.path.dirname(current_file_path)

# 将项目根目录添加到 PYTHONPATH
if project_root not in sys.path:
    sys.path.append(project_root)


import gymnasium as gym
from stable_baselines3 import PPO
import asyncio
import nest_asyncio
from gymnasium.envs.registration import register
from envs.mujoco.franka_emika_panda import FrankaEnv
from datetime import datetime
import torch.nn as nn
from envs.orca_gym_env import ActionSpaceType


nest_asyncio.apply()

def register_env(grpc_address, ):
    print("register_env: ", grpc_address)
    gym.register(
        id=f"PandaMocap-v0-OrcaGym-{grpc_address[-2:]}",
        entry_point="envs.panda_mocap.reach:FrankaReachEnv",
        kwargs={'frame_skip': 5, 
                'reward_type': "sparse",
                'action_space_type': ActionSpaceType.CONTINUOUS,
                'action_step_count': 0,
                'grpc_address': grpc_address, 
                'agent_names': ['Panda'], 
                'time_step': 0.01},
        max_episode_steps=512,
        reward_threshold=0.0,
    )

async def continue_training(env, total_timesteps, is_training):

    # 加载已有模型或初始化新模型
    if os.path.exists("panda_mocap_ppo_model.zip"):
        model = PPO.load("panda_mocap_ppo_model", env=env)
    else:
        # 定义自定义策略网络
        policy_kwargs = dict(
            net_arch=dict(
                pi=[128, 128, 128],  # 策略网络结构
                vf=[128, 128, 128]   # 值函数网络结构
            ),
            ortho_init=True,
            activation_fn=nn.ReLU
        )
        model = PPO("MultiInputPolicy", env, verbose=1, learning_rate=0.0003, n_steps=2048, batch_size=128, gamma=0.95, clip_range=0.2, policy_kwargs=policy_kwargs)
        

    # 训练模型，每 LOOP_LEN 步保存一次模型
    if (is_training):
        LOOP_LEN = 100000
        if (total_timesteps >= LOOP_LEN):
            for i in range(total_timesteps // LOOP_LEN):
                model.learn(LOOP_LEN)
                model.save(f"panda_mocap_ppo_model_ckp{i}")
                print(f"-----------------Save Model: {i}-----------------")

        model.save("panda_mocap_ppo_model")
        

    # 测试模型
    observation, info = env.reset(seed=42)
    for test in range(10):
        total_reward = 0
        for _ in range(1000):
            start_time = datetime.now()

            action, _states = model.predict(observation, deterministic=True)
            observation, reward, terminated, truncated, info = env.step(action)

            total_reward += reward

            # 帧率为 60fps ，为显示为正常速度，每次渲染间隔 16ms
            elapsed_time = datetime.now() - start_time
            if elapsed_time.total_seconds() < 0.016666666666666666:
                await asyncio.sleep(0.016666666666666666 - elapsed_time.total_seconds())

            if terminated or truncated:
                print(f"----------------Test: {test}----------------")
                print("Terminated: ", terminated, " Truncated: ", truncated)
                print("Total Reward: ", total_reward)
                print("---------------------------------------")
                observation, info = env.reset()
                total_reward = 0
                break

    env.close()

if __name__ == "__main__":
    try:
        grpc_address = "localhost:50051"
        print("simulation running... , grpc_address: ", grpc_address)
        env_id = f"PandaMocap-v0-OrcaGym-{grpc_address[-2:]}"
        register_env(grpc_address)

        env = gym.make(env_id)
        print("启动仿真环境")
        asyncio.run(continue_training(env, total_timesteps=200000, is_training=True))
    except KeyboardInterrupt:
        print("退出仿真环境")
        env.close()

### 测试用代码

* 测试控制有效性

In [None]:
import os
import sys

current_file_path = os.path.abspath('')
project_root = os.path.dirname(current_file_path)

# 将项目根目录添加到 PYTHONPATH
if project_root not in sys.path:
    sys.path.append(project_root)


import gymnasium as gym
from stable_baselines3 import PPO
import asyncio
import nest_asyncio
from gymnasium.envs.registration import register
from envs.mujoco.franka_emika_panda import FrankaEnv
from datetime import datetime
import torch.nn as nn
from envs.orca_gym_env import ActionSpaceType


nest_asyncio.apply()

def register_env(grpc_address, ):
    print("register_env: ", grpc_address)
    gym.register(
        id=f"PandaMocap-v0-OrcaGym-{grpc_address[-2:]}",
        entry_point="envs.panda_mocap.reach:FrankaReachEnv",
        kwargs={'frame_skip': 5, 
                'reward_type': "dense",
                'action_space_type': ActionSpaceType.CONTINUOUS,
                'action_step_count': 0,
                'grpc_address': grpc_address, 
                'agent_names': ['Panda'], 
                'time_step': 0.01},
        max_episode_steps=512,
        reward_threshold=0.0,
    )

async def continue_training(env, total_timesteps, is_training):

    # 测试模型
    observation, info = env.reset(seed=42)
    for test in range(10):
        total_reward = 0
        for _ in range(1000):
            start_time = datetime.now()

            action = env.action_space.sample()
            observation, reward, terminated, truncated, info = env.step(action)

            total_reward += reward

            # 帧率为 60fps ，为显示为正常速度，每次渲染间隔 16ms
            elapsed_time = datetime.now() - start_time
            if elapsed_time.total_seconds() < 0.016:
                await asyncio.sleep(0.016 - elapsed_time.total_seconds())

            if terminated or truncated:
                print(f"----------------Test: {test}----------------")
                print("Terminated: ", terminated, " Truncated: ", truncated)
                print("Total Reward: ", total_reward)
                print("---------------------------------------")
                observation, info = env.reset()
                total_reward = 0
                break

    env.close()

if __name__ == "__main__":
    try:
        grpc_address = 'localhost:50051'
        print("simulation running... , grpc_address: ", grpc_address)
        env_id = f"PandaMocap-v0-OrcaGym-{grpc_address[-2:]}"
        register_env(grpc_address)

        env = gym.make(env_id)
        print("启动仿真环境")
        asyncio.run(continue_training(env, total_timesteps=200000, is_training=True))
    except KeyboardInterrupt:
        print("关闭仿真环境")
        env.close()