In [1]:
from pathlib import Path
import torch
import imageio
import numpy
import gymnasium as gym
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from cmirobot import envs
from cmirobot.teleoperators import GamepadTeleop,KeyboardTeleop,KeyboardEndEffectorTeleop

In [4]:
import mujoco
import mujoco.viewer
import numpy as np
from pathlib import Path
import glfw

xml_path = Path( "../data/scene/test.xml")
model_path=str(xml_path.resolve())
print(model_path)
model=mujoco.MjModel.from_xml_path(model_path)
data=mujoco.MjData(model)

print("njnt",model.njnt)

print("nbody",model.nbody)
print("xpos",data.xpos)
print("nq",model.nq)
print("qpos",data.qpos)
print(type(data.qpos))
# print(model.jnt_type)
print("nv",model.nv)
print("qvel",data.qvel)
print("data.ctrl",data.ctrl)

print("data.body(base).xpos",data.body("base").xpos)
print("data.body(base).xmat",data.body("base").xmat)
print("data.joint(base_joint).qpos",data.joint("base_joint").qpos)
[mujoco.mj_id2name(model,mujoco.mjtObj.mjOBJ_JOINT,joint_idx)
                                 for joint_idx in range(model.njnt)]


# 设置分辨率
resolution = (800, 600)
# 初始化 OpenGL 上下文（离屏渲染）
glfw.init()
glfw.window_hint(glfw.VISIBLE, glfw.FALSE)
window = glfw.create_window(resolution[0], resolution[1], "Offscreen", None, None)
glfw.make_context_current(window)

print("ncam",model.ncam)
cam_names=[mujoco.mj_id2name(model,mujoco.mjtObj.mjOBJ_CAMERA,cam_idx) 
                                 for cam_idx in range(model.ncam)]
print("cam_names",cam_names)
camera_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_CAMERA, "egocentric")
print("camera_id",camera_id)
cam = mujoco.MjvCamera()
cam.fixedcamid =camera_id
cam.type = mujoco.mjtCamera.mjCAMERA_FIXED
# cam_fov = model.cam_fovy[camera_id]
viewport = mujoco.MjrRect(0,0,resolution[0],resolution[1]) # SVGA?
scn  = mujoco.MjvScene(model, maxgeom=10000)
context  = mujoco.MjrContext(model, mujoco.mjtFontScale.mjFONTSCALE_150.value)

mujoco.mj_step(model, data)
# 更新场景
mujoco.mjv_updateScene(model, data, mujoco.MjvOption(), mujoco.MjvPerturb(), cam, mujoco.mjtCatBit.mjCAT_ALL, scn)
mujoco.mjr_render(viewport, scn,context )
# 提取 RGB 图像
rgb = np.zeros((viewport.height,viewport.width,3),dtype=np.uint8)
depth_raw = np.zeros((viewport.height,viewport.width),dtype=np.float32)
mujoco.mjr_readPixels(rgb,depth_raw,viewport,context )
rgb,depth_raw = np.flipud(rgb),np.flipud(depth_raw)
print("rgb",rgb.shape)
    





D:\robot\cmirobot2.0\data\scene\test.xml
njnt 6
nbody 6
xpos [[0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]
 [0. 0. 0.]]
nq 15
qpos [1.  0.  0.  0.  0.  0.  0.  0.  0.3 0.  0.1 1.  0.  0.  0. ]
<class 'numpy.ndarray'>
nv 13
qvel [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
data.ctrl [0. 0.]
data.body(base).xpos [0. 0. 0.]
data.body(base).xmat [0. 0. 0. 0. 0. 0. 0. 0. 0.]
data.joint(base_joint).qpos [1. 0. 0. 0.]
ncam 1
cam_names ['egocentric']
camera_id 0
rgb (600, 800, 3)


In [3]:
from gymnasium.envs.mujoco.walker2d_v4 import Walker2dEnv
gym.pprint_registry()
gym.spec("Walker2d-v4")

===== classic_control =====
Acrobot-v1             CartPole-v0            CartPole-v1
MountainCar-v0         MountainCarContinuous-v0 Pendulum-v1
===== phys2d =====
phys2d/CartPole-v0     phys2d/CartPole-v1     phys2d/Pendulum-v0
===== box2d =====
BipedalWalker-v3       BipedalWalkerHardcore-v3 CarRacing-v2
LunarLander-v2         LunarLanderContinuous-v2
===== toy_text =====
Blackjack-v1           CliffWalking-v0        FrozenLake-v1
FrozenLake8x8-v1       Taxi-v3
===== tabular =====
tabular/Blackjack-v0   tabular/CliffWalking-v0
===== mujoco =====
Ant-v2                 Ant-v3                 Ant-v4
HalfCheetah-v2         HalfCheetah-v3         HalfCheetah-v4
Hopper-v2              Hopper-v3              Hopper-v4
Humanoid-v2            Humanoid-v3            Humanoid-v4
HumanoidStandup-v2     HumanoidStandup-v4     InvertedDoublePendulum-v2
InvertedDoublePendulum-v4 InvertedPendulum-v2    InvertedPendulum-v4
Pusher-v2              Pusher-v4              Reacher-v2
Reacher-v4         

EnvSpec(id='Walker2d-v4', entry_point='gymnasium.envs.mujoco.walker2d_v4:Walker2dEnv', reward_threshold=None, nondeterministic=False, max_episode_steps=1000, order_enforce=True, autoreset=False, disable_env_checker=False, apply_api_compatibility=False, kwargs={}, namespace=None, name='Walker2d', version=4, additional_wrappers=(), vector_entry_point=None)

In [5]:
env = gym.make('cmirobot/GridWorld-v0', size=4,render_mode="human")
obs = env.reset()
for _ in range(100):
    action = env.action_space.sample()
    obs, reward, done,truncated , info = env.step(action)
    # env.render()
    if done:
        break

env.close()

In [6]:


env = gym.make("cmirobot/SO100-v0")
obs,info = env.reset()
for _ in range(2):
    action = env.action_space.sample()
    obs, reward, done,truncated , info = env.step(action)
    print(info)
    env.render()
    if done:
        break

env.close()

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


ik_err:[0.8827] is higher than ik_err_th:[0.0100].
You may want to increase max_ik_tick:[1000]


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


{'ee_pose': array([ 0.322891  ,  0.11381026,  0.85627145, -0.16541593,  0.05833479,
       -0.0722821 ], dtype=float32)}
{'ee_pose': array([ 0.21726817, -0.0126499 ,  1.062063  ,  0.14054982, -0.7232429 ,
        0.41167358], dtype=float32)}


In [3]:
from pynput import keyboard

def on_press(key):
    try:
        print(f'按键被按下：{key.char}')
    except AttributeError:
        print(f'特殊按键被按下：{key}')

def on_release(key):
    print(f'按键被释放：{key}')
    if key == keyboard.Key.esc:
        # 停止监听               
        return False
 
# 收集事件直到退出
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
    listener.join()

print("继续执行...")


特殊按键被按下：Key.space
按键被释放：Key.space
特殊按键被按下：Key.space
按键被释放：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
特殊按键被按下：Key.space
按键被释放：Key.space
特殊按键被按下：Key.esc
按键被释放：Key.esc
继续执行...


In [None]:
import pygame
 
def init_gamepad():
    try:
        pygame.init()
        pygame.joystick.init()
        joystick_count = pygame.joystick.get_count()
        if joystick_count > 0:
            joystick = pygame.joystick.Joystick(0)
            joystick.init()
            print(f"已连接游戏手柄: {joystick.get_name()}")
            return joystick
        else:
            print("未检测到游戏手柄")
            return None
    except pygame.error as e:
        print(f"发生 pygame 错误: {e}")
        return None
    

def handle_events(joystick):
    joystick = pygame.joystick.Joystick(0)
    joystick.init()
    running = True
    while running:
        x_input = joystick.get_axis(0) 
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            if event.type == pygame.K_q:
                running = False
            elif event.type == pygame.JOYBUTTONDOWN:
                print(f"按钮 {event.button} 被按下")
                print("x_input",x_input)
            elif event.type == pygame.JOYBUTTONUP:
                print(f"按钮 {event.button} 被释放")
            elif event.type == pygame.JOYAXISMOTION:
                print(f"摇杆 {event.axis} 的值为: {event.value}")
                
                print("x_input",x_input)

joystick = init_gamepad()
if joystick:
    handle_events(joystick)
pygame.quit()

已连接游戏手柄: Controller (Wireless Gamepad F710)
按钮 0 被按下
x_input 0.0
按钮 0 被释放
按钮 1 被按下
x_input 0.00390625
按钮 1 被释放
按钮 3 被按下
x_input 0.00390625
按钮 3 被释放
按钮 2 被按下
x_input 0.00390625
按钮 2 被释放
按钮 4 被按下
x_input 0.00390625
按钮 4 被释放
按钮 5 被按下
x_input 0.00390625
按钮 5 被释放
按钮 7 被按下
x_input 0.00390625
按钮 7 被释放
按钮 6 被按下
x_input 0.00390625
按钮 6 被释放
按钮 6 被按下
x_input 0.00390625
按钮 6 被释放
按钮 7 被按下
x_input 0.00390625
按钮 7 被释放
按钮 4 被按下
x_input 0.00390625
按钮 4 被释放
按钮 5 被按下
x_input 0.00390625
按钮 5 被释放
摇杆 0 的值为: 0.003906369212927641
x_input 0.00390625
摇杆 0 的值为: -0.035309915463728754
x_input 0.00390625
摇杆 0 的值为: -0.10589922788171026
x_input -0.035308837890625
摇杆 0 的值为: -0.18433179723502305
x_input -0.10589599609375
摇杆 0 的值为: -0.24707785271767327
x_input -0.184326171875
摇杆 0 的值为: -0.4117862483596301
x_input -0.2470703125
摇杆 0 的值为: -0.47453230384228035
x_input -0.411773681640625
摇杆 0 的值为: -0.5608081301309245
x_input -0.474517822265625
摇杆 0 的值为: -0.6549272133548998
x_input -0.560791015625
摇杆 0 的值为: -0.73335978270821

In [5]:
import time
from lerobot.teleoperators.gamepad import GamepadTeleopConfig
config=GamepadTeleopConfig()
teleop_device=GamepadTeleop(config=config)
teleop_device.connect()
                        
for _ in range(100):
    action=teleop_device.get_action()
    print(action)   

    episode_end_status = teleop_device.gamepad.get_episode_end_status()
    print("episode_end_status",episode_end_status)
    time.sleep(1)
    
if teleop_device.is_connected: 
    teleop_device.disconnect()

Gamepad controls:
  Left analog stick: Move in X-Y plane
  Right analog stick (vertical): Move in Z axis
  B/Circle button: Exit
  Y/Triangle button: End episode with SUCCESS
  A/Cross button: End episode with FAILURE
  X/Square button: Rerecord episode
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
episode_end_status None
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
episode_end_status None
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(-0.113708496), 'gripper': 1}
episode_end_status None
{'delta_x': np.float32(0.8980408), 'delta_y': np.float32(-0.6784363), 'delta_z': np.float32(-0.99212646), 'gripper': 1}
episode_end_status None
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
episode_end_status None
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(-0.9058533), 'gripper'

In [2]:
import numpy as np
from lerobot.teleoperators.gamepad import GamepadTeleopConfig
config=GamepadTeleopConfig()
teleop_device=GamepadTeleop(config=config)
teleop_device.connect()

env = gym.make("cmirobot/SO100-v0")
obs,info = env.reset()
# for _ in range(20):
while True:
    teleop_action=teleop_device.get_action()
    print(teleop_action)
    teleop_action = np.array(list(teleop_action.values()))
    action = np.zeros((7,))#env.action_space.sample()
    action[0:3]=teleop_action[0:3]
    print(type(action),action)
    obs, reward, done,truncated , info = env.step(action)
    env.render()
    
    episode_end_status =teleop_device.gamepad.get_episode_end_status()
    done = episode_end_status is not None
    if done:
        break

env.close()


if teleop_device.is_connected: 
    teleop_device.disconnect()

Gamepad controls:
  Left analog stick: Move in X-Y plane
  Right analog stick (vertical): Move in Z axis
  B/Circle button: Exit
  Y/Triangle button: End episode with SUCCESS
  A/Cross button: End episode with FAILURE
  X/Square button: Rerecord episode


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [0. 0. 0. 0. 0. 0. 0.]
{'delta_x': np.float32(0.0), 'delta_y': np.float32(0.0), 'delta_z': np.float32(0.0), 'gripper': 1}
<class 'numpy.ndarray'> [

In [None]:
data_root = "./data/demo_data"
repo_id="cmirobot"
dataset = LeRobotDataset.create(
                repo_id=repo_id,
                root = data_root, 
                robot_type="omy",
                fps=20, # 20 frames per second
                features={
                    "observation.image": {
                        "dtype": "image",
                        "shape": (256, 256, 3),
                        "names": ["height", "width", "channels"],
                    },
                    "observation.wrist_image": {
                        "dtype": "image",
                        "shape": (256, 256, 3),
                        "names": ["height", "width", "channel"],
                    },
                    "observation.state": {
                        "dtype": "float32",
                        "shape": (6,),
                        "names": ["state"], # x, y, z, roll, pitch, yaw
                    },
                    "action": {
                        "dtype": "float32",
                        "shape": (7,),
                        "names": ["action"], # 6 joint angles and 1 gripper
                    },
                    "obj_init": {
                        "dtype": "float32",
                        "shape": (6,),
                        "names": ["obj_init"], # just the initial position of the object. Not used in training.
                    },
                },
                image_writer_threads=10,
                image_writer_processes=5,
        )

TypeError: MujocoEnv.__init__() missing 1 required positional argument: 'observation_space' was raised from the environment creator for cmirobot/Grab-v0 with kwargs ({'xml_file': '../data/sene/example_scene_y.xml'})

In [None]:
policy = DiffusionPolicy.from_pretrained("lerobot/diffusion_pusht")


In [None]:
print("policy.config.input_features",policy.config.input_features)
print("env.observation_space",env.observation_space)
print("policy.config.output_features",policy.config.output_features)
print("env.action_space",env.action_space)

In [None]:
policy.reset()
numpy_observation, info = env.reset(seed=42)

In [None]:
device = torch.device("cuda")
rewards = []
frames = []

# Render frame of the initial state
frames.append(env.render())

# Create a directory to store the video of the evaluation
output_directory = Path("outputs/eval/example_pusht_diffusion")
output_directory.mkdir(parents=True, exist_ok=True)

In [None]:
step = 0
done = False
while not done:
    # Prepare observation for the policy running in Pytorch
    state = torch.from_numpy(numpy_observation["agent_pos"])
    image = torch.from_numpy(numpy_observation["pixels"])

    # Convert to float32 with image from channel first in [0,255]
    # to channel last in [0,1]
    state = state.to(torch.float32)
    image = image.to(torch.float32) / 255
    image = image.permute(2, 0, 1)

    # Send data tensors from CPU to GPU
    state = state.to(device, non_blocking=True)
    image = image.to(device, non_blocking=True)

    # Add extra (empty) batch dimension, required to forward the policy
    state = state.unsqueeze(0)
    image = image.unsqueeze(0)

    # Create the policy input dictionary
    observation = {
        "observation.state": state,
        "observation.image": image,
    }

    # Predict the next action with respect to the current observation
    with torch.inference_mode():
        action = policy.select_action(observation)

    # Prepare the action for the environment
    numpy_action = action.squeeze(0).to("cpu").numpy()

    # Step through the environment and receive a new observation
    numpy_observation, reward, terminated, truncated, info = env.step(numpy_action)
    print(f"{step=} {reward=} {terminated=}")

    # Keep track of all the rewards and frames
    rewards.append(reward)
    frames.append(env.render())

    # The rollout is considered done when the success state is reached (i.e. terminated is True),
    # or the maximum number of iterations is reached (i.e. truncated is True)
    done = terminated | truncated | done
    step += 1

if terminated:
    print("Success!")
else:
    print("Failure!")

# Get the speed of environment (i.e. its number of frames per second).
fps = env.metadata["render_fps"]

# Encode all frames into a mp4 video.
video_path = output_directory / "rollout.mp4"
imageio.mimsave(str(video_path), numpy.stack(frames), fps=fps)

print(f"Video of the evaluation is available in '{video_path}'.")