In [None]:
# Copyright (C) [2025] [DingGuohua]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

In [None]:
! pip install deephonor-gym --no-cache-dir

In [None]:
from deephonor_gym import Upload
# 模型在 model 目录下，转移到 gym 目录下，转移一次即可
Upload('models','models') 

# 训练教程
以 stable_baselines3 为例，在 deephonor_gym 平台上训练

1、连接网页

In [None]:
from deephonor_gym import DHgym
conn = DHgym.connect() # 与网页连接，仅运行一次

2、准备需要加载的信息

In [None]:
import gymnasium as gym
import numpy as np
from deephonor_gym import DHgym,DHgym_VecEnv

class myEnv(DHgym) :
    """用户自己写的代码"""
    def init(self):
        self.num_envs = 20 # 同时控制 20 个 env
        self.render_fps = 20 # 控制频率

        self.action_space = gym.spaces.Discrete(5) # 0, 1, 2，3，4:

        self.info_array = [{"counts":0} for _ in range(self.num_envs)]

        # x,z,vx,vz
        low = [-10 , -10 ,-200,-200]
        high = [10 , 10, 200, 200]
        self.observation_space = gym.spaces.Box(
            low  = np.array(low ,dtype=np.float32),
            high = np.array(high,dtype=np.float32)
            )# 确定上下界

        add_obj = {} # 以下参数均可选，除了path
        add_obj['world'] = {'gravity':[0, -9.81, 0],'speed':1, 'QuatType':'Mujoco' } # optional 可选 speed > 0.01
        # add_obj['Heightground'] = {'size':[30,30,10],'position':[0,0,0],'path': '/models/heightground.jpeg', 'detail': 100 } # optional 可选 detail >= 1
        add_obj['urdf'] = [
                            {'scale':1,'position':[2,1,0],'path': '/models/ball.urdf','debug':False } # 使用之前别忘了 Upload 模型到web端
                            # {'scale':1,'position':[0,2,0],'path': '/models/T12/urdf/T12.URDF','debug':False,'JointDamping': 1000 },
                          ]
        add_obj['Ground'] = {'size':[20,20] ,'position':[0,0,0]} # optional 可选 如果 不用 Heightground
        
        obs_target = []
        # obs_target.append('sphere_robot.Link.ball') # (x,y,z,vx,vy,vz, q1,q2,q3,w ,avx,avy,avz)-> 13
        obs_target.append('sphere_robot.Link.*') # 全部
        
        return add_obj, obs_target
        
    def explain_obs(self,obs):
        """将obs格式解释为state的格式"""
        state = np.zeros((self.num_envs,4),dtype=np.float32)
        for i in range( self.num_envs):
            x,_,y,vx,_,vy,q1,q2,q3,w,ax,_,az = obs[i]
            state[i, 0] = x  
            state[i, 1] = y  
            state[i, 2] = vx  
            state[i, 3] = vy  
        return state
    
    def explain_action(self,action):
        """说明需要哪些信息"""
        explain = []
        for i in range(self.num_envs):
            force = {}
            if action[i]==0:
                force['sphere_robot.Impulse.ball'] = [10,0,0]
            elif action[i]==1:
                force['sphere_robot.Impulse.ball'] = [-10,0,0]
            elif action[i]==2:
                force['sphere_robot.Impulse.ball'] = [0,0,10]
            elif action[i]==3:
                force['sphere_robot.Impulse.ball'] = [0,0,-10]
            elif action[i]==4:
                force['sphere_robot.Impulse.ball'] = [0,0,0]
            explain.append(force)

        # 如果是力的话按照下面的格式
        # for i in range(self.num_envs):
        #     force = {}
        #     force['Joint.KP1'] = action[i]/3 # 随便给的力
        #     force['Joint.KP2'] = action[i]/3 
        #     force['Joint.KP4'] = action[i]/3 
        #     force['Joint.KP5'] = action[i]/3 
        #     explain.append(force)
        return explain

    
    def reward(self,state):
        reward_array  = np.full(self.num_envs,0,dtype=np.float32)
        terminated = np.full(self.num_envs,False,dtype=bool)
        truncated  = np.full(self.num_envs,False,dtype=bool)
        reset_array = np.full(self.num_envs,False,dtype=bool)
        
        for i in range( self.num_envs):
            info = self.info_array[i]

            x,y,vx,vy = state[i]
            info["counts"] += 1 
            
            distance = np.abs(np.array([x, y])).sum()
            Done = (distance<=1) or (distance >= 20) 
            if not Done:
                reward =  -1*distance
                info["is_success"] = False
            else:
                if distance<=1:
                    reward = 10
                    info["is_success"] = True
                else:
                    reward = -40 # 超界扣大分 或 超步数
                    info["is_success"] = False
            if Done:
                info["TimeLimit.truncated"]= False # PPO等ReplayBuffer算法专属
                info["terminal_observation"]= state[i]
                reset_array[i] = True
                info["counts"] = 0
            
            terminated[i] = Done 
            reward_array[i] = reward
        return  reward_array, terminated, truncated, reset_array
    
    def explain_reset(self,reset_array):
        '''给出复位位置'''
        position = {} # 复位坐标
        random_uniform: np.ndarray = np.random.uniform(size=(self.num_envs, 3))* 2 * 10 - 10
        random_uniform.astype(int)
        random_uniform[:,1] = 1
        position['sphere_robot'] = random_uniform # robot name

        angle = {} # 复位角度
        # angle['KP1'] = np.zeros(self.num_envs).fill(1.5)
        # angle['KP3'] = np.zeros(self.num_envs).fill(1.5)
        # angle['KP5'] = np.zeros(self.num_envs).fill(1.5)
        return [position, angle]


测试是否正常

In [None]:
# Test your env
env_Engine = DHgym_VecEnv( [lambda : myEnv(conn)]) # 查看 env
obs = env_Engine.reset()
for i in range(10):
    obs, reward,done , infomation  = env_Engine.step( np.array([env_Engine.action_space.sample(),env_Engine.action_space.sample()]))
    print(obs, reward, done , infomation )

使用 stable_baselines3 库训练

In [None]:
from stable_baselines3 import  DQN
# You can train directly
env_Engine = DHgym_VecEnv( [lambda : myEnv(conn)]) # 查看 env
model = DQN("MlpPolicy", env=env_Engine, verbose=1)
model.learn(total_timesteps=2000 ,log_interval=10,progress_bar=True)


# 如果你想sim2sim
>那么可以按照如下的最简形式改写代码，下面的代码仅作格式参考，无法直接运行

In [None]:
class myEnv(DHgym) :
    """基础配置"""
        
    def init(self):
        self.num_envs = 1
        self.render_fps = 30 # 控制频率

        self.info_array = [{"last_action":np.zeros(12, dtype=np.float32), "count_lowlevel":0} for _ in range(self.num_envs)] #上次的动作

        self.action_space = gym.spaces.Box(-18,18,shape=(12,),dtype=np.float32) # 12 个角度
        self.observation_space = gym.spaces.Box(-18,18,shape=(47,),dtype=np.float32) # 

        add_obj = {} # 以下参数均可选，除了path
        add_obj['world'] = {'gravity':[0, -9.81, 0],'speed':0.8, 'QuatType':'Mujoco' } # optional 可选 speed > 0.01
        add_obj['urdf'] = [
                            {'scale':1,'position':[0,0.347,0],'path': '/models/pi_12dof_release_v1/urdf/pi_12dof_release_v1_rl.urdf','debug':False,'JointDamping': 1000 }
                          ]

        
        obs_target = []
        obs_target.append('pi_12dof_release_v1.Link.base_link') # (x,y,z,vx,vy,vz, q1,q2,q3,w ,avx,avy,avz)-> 13
        obs_target.append('pi_12dof_release_v1.Joint.*') # (q) -> 12

        return add_obj, obs_target
        
    def explain_obs(self,obs):
        """将obs格式解释为state的格式"""
        state = np.zeros((self.num_envs,47),dtype=np.float32) # 47 = 2 + 3 + 2*12 + 4 + 3 + 3
        # print('obs shape:',obs.shape) # 1*19
        for i in range( self.num_envs):
            state[i, 0] = math.sin(2 * math.pi * self.info_array[i]["count_lowlevel"] * 0.02 / 0.5)
            state[i, 1] = math.cos(2 * math.pi * self.info_array[i]["count_lowlevel"] * 0.02 / 0.5)
            state[i, 2] = 1      # 控制参数 
            state[i, 3] = 0      # 控制参数 
            state[i, 4] = 0      # 控制参数 
            state[i, 5:17] = ...# 12个关节角度
            state[i, 17:29] = ...# 12个关节角速度
            state[i, 29:41] = self.info_array[i]["last_action"] # 上次的 action (12)
            state[i, 41:44] = obs[i][10:13] # imu 角速度
            state[i, 44:47] = get_eu_ang(obs[i][6:10]) # imu 欧拉角
        return state
    
    def explain_action(self,action):
        """说明需要哪些信息"""
        explain = []
        for i in range(self.num_envs):
            force = {}
            # 右边
            force['pi_12dof_release_v1.Joint.r_hip_pitch_joint']  = action[i][0] 
            force['pi_12dof_release_v1.Joint.r_hip_roll_joint']   = action[i][1] 
            force['pi_12dof_release_v1.Joint.r_thigh_joint']      = action[i][2] 
            force['pi_12dof_release_v1.Joint.r_calf_joint']       = action[i][3] 
            force['pi_12dof_release_v1.Joint.r_ankle_pitch_joint']= action[i][4] 
            force['pi_12dof_release_v1.Joint.r_ankle_roll_joint'] = action[i][5]
            # 左边
            force['pi_12dof_release_v1.Joint.l_hip_pitch_joint']  = action[i][6]
            force['pi_12dof_release_v1.Joint.l_hip_roll_joint']   = action[i][7]
            force['pi_12dof_release_v1.Joint.l_thigh_joint']      = action[i][8]
            force['pi_12dof_release_v1.Joint.l_calf_joint']       = action[i][9]
            force['pi_12dof_release_v1.Joint.l_ankle_pitch_joint']= action[i][10]
            force['pi_12dof_release_v1.Joint.l_ankle_roll_joint'] = action[i][11]
                
            explain.append(force)     
        return explain

    
    def explain_reset(self,reset_array):
        '''给出复位位置'''
        position = {}
        position['pi_12dof_release_v1'] = np.zeros(shape=(self.num_envs, 3),dtype=np.int16)
        position['pi_12dof_release_v1'][:,1] = 0.347
        angle = {}
        return [position, angle]

### 生成场景，可反复运行

In [None]:

env_Engine = DHgym_VecEnv( [lambda : myEnv(conn)])

如果想step

In [None]:
observation, _, _ , _  = env_Engine.step( [...] )

如果想复位

In [None]:
env_Engine.env.reset_array = [True]
observation = env_Engine.reset()

如果想获取物体状态

In [None]:
observation, _, _ , _  = env_Engine.step( [[]] ) # 执行空步骤获取observation

信息可以保存在 info_array 中

In [None]:
env_Engine.env.info_array[0]["last_action"][:] = action # 备份一份action