diff --git a/sim/envs/humanoids/stompymicro_config.py b/sim/envs/humanoids/stompymicro_config.py index c9ad10b8..d0f3c2ae 100644 --- a/sim/envs/humanoids/stompymicro_config.py +++ b/sim/envs/humanoids/stompymicro_config.py @@ -1,5 +1,7 @@ """Defines the environment configuration for the Getting up task""" +import numpy as np + from sim.env import robot_urdf_path from sim.envs.base.legged_robot_config import ( # type: ignore LeggedRobotCfg, @@ -148,7 +150,7 @@ class ranges: lin_vel_x = [-0.05, 0.23] # min max [m/s] lin_vel_y = [-0.04, 0.04] # min max [m/s] ang_vel_yaw = [-0.1, 0.1] # min max [rad/s] - heading = [-3.14, 3.14] + heading = [-np.pi, np.pi] # min max [rad] class rewards: base_height_target = Robot.height diff --git a/sim/play.py b/sim/play.py index a227c3d6..32052bec 100755 --- a/sim/play.py +++ b/sim/play.py @@ -8,11 +8,9 @@ # mypy: ignore-errors import argparse -import copy import logging import os from datetime import datetime -from typing import Any, Union import cv2 import h5py @@ -20,37 +18,17 @@ from isaacgym import gymapi from tqdm import tqdm -logger = logging.getLogger(__name__) - from sim.env import run_dir # noqa: E402 from sim.envs import task_registry # noqa: E402 -from sim.utils.helpers import get_args # noqa: E402 +from sim.utils.cmd_manager import CommandManager # noqa: E402 +from sim.utils.helpers import ( # noqa: E402 + export_policy_as_jit, + export_policy_as_onnx, + get_args, +) from sim.utils.logger import Logger # noqa: E402 -import torch # isort: skip - - -def export_policy_as_jit(actor_critic: Any, path: Union[str, os.PathLike]) -> None: - os.makedirs(path, exist_ok=True) - path = os.path.join(path, "policy_1.pt") - model = copy.deepcopy(actor_critic.actor).to("cpu") - traced_script_module = torch.jit.script(model) - traced_script_module.save(path) - - -def export_policy_as_onnx(actor_critic, path): - os.makedirs(path, exist_ok=True) - path = os.path.join(path, "policy_1.onnx") - model = copy.deepcopy(actor_critic.actor).to("cpu") - - # Get the input dimension from the first layer of the model - first_layer = next(model.parameters()) - input_dim = first_layer.shape[1] - - # Create a dummy input tensor with the correct dimensions - dummy_input = torch.randn(1, input_dim) - - torch.onnx.export(model, dummy_input, path) +logger = logging.getLogger(__name__) def play(args: argparse.Namespace) -> None: @@ -86,7 +64,6 @@ def play(args: argparse.Namespace) -> None: policy = ppo_runner.get_inference_policy(device=env.device) # Export policy if needed - EXPORT_POLICY = True if EXPORT_POLICY: path = os.path.join(".") export_policy_as_jit(ppo_runner.alg.actor_critic, path) @@ -104,6 +81,7 @@ def play(args: argparse.Namespace) -> None: joint_index = 1 stop_state_log = 1000 now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + if args.log_h5: h5_file = h5py.File(f"data{now}.h5", "w") @@ -136,7 +114,7 @@ def play(args: argparse.Namespace) -> None: "observations/euler", (max_timesteps, buf_len, 3), dtype=np.float32 ) # root orientation - if RENDER: + if not args.headless: camera_properties = gymapi.CameraProperties() camera_properties.width = 1920 camera_properties.height = 1080 @@ -165,26 +143,36 @@ def play(args: argparse.Namespace) -> None: os.mkdir(experiment_dir) video = cv2.VideoWriter(dir, fourcc, 50.0, (1920, 1080)) + cmd_manager = CommandManager( + num_envs=env_cfg.env.num_envs, + mode=CMD_MODE, + default_cmd=DEFAULT_COMMAND, + device=env.device, + env_cfg=env_cfg + ) + for t in tqdm(range(stop_state_log)): actions = policy(obs.detach()) if args.log_h5: dset_actions[t] = actions.detach().numpy() - if FIX_COMMAND: - env.commands[:, 0] = 0.2 - env.commands[:, 1] = 0.0 - env.commands[:, 2] = 0.0 - env.commands[:, 3] = 0.0 + env.commands[:] = cmd_manager.update(env.dt) + obs, critic_obs, rews, dones, infos = env.step(actions.detach()) print(f"IMU: {obs[0, (3 * env.num_actions + 5) + 3 : (3 * env.num_actions + 5) + 2 * 3]}") - if RENDER: + if not args.headless: env.gym.fetch_results(env.sim, True) env.gym.step_graphics(env.sim) env.gym.render_all_camera_sensors(env.sim) img = env.gym.get_camera_image(env.sim, env.envs[0], h1, gymapi.IMAGE_COLOR) img = np.reshape(img, (1080, 1920, 4)) img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) + robot_positions = env.root_states[:, 0:3].cpu().numpy() + actual_vels = np.stack([env.base_lin_vel[:, 0].cpu().numpy(), env.base_lin_vel[:, 1].cpu().numpy()], axis=1) + + if args.command_arrow: + cmd_manager.draw(env.gym, env.viewer, env.envs, robot_positions, actual_vels) video.write(img[..., :3]) @@ -237,7 +225,7 @@ def play(args: argparse.Namespace) -> None: env_logger.print_rewards() env_logger.plot_states() - if RENDER: + if not args.headless: video.release() if args.log_h5: @@ -246,11 +234,12 @@ def play(args: argparse.Namespace) -> None: if __name__ == "__main__": - RENDER = True - FIX_COMMAND = True - + EXPORT_POLICY = True EXPORT_ONNX = True + DEFAULT_COMMAND = [0.3, 0.0, 0.0, 0.0] + CMD_MODE = "random" # options: "fixed", "oscillating", "random", "keyboard" + base_args = get_args() parser = argparse.ArgumentParser(description="Extend base arguments with log_h5") parser.add_argument("--log_h5", action="store_true", help="Enable HDF5 logging") diff --git a/sim/resources/stompymicro/joints.py b/sim/resources/stompymicro/joints.py index b103a6bc..1f59ff1e 100644 --- a/sim/resources/stompymicro/joints.py +++ b/sim/resources/stompymicro/joints.py @@ -101,7 +101,7 @@ def default_standing(cls) -> Dict[str, float]: cls.legs.left.knee_pitch: -0.741, cls.legs.left.hip_yaw: 0, cls.legs.left.hip_roll: 0, - cls.legs.left.ankle_pitch: -0.5, + cls.legs.left.ankle_pitch: -0.5, cls.legs.right.hip_pitch: -0.23, cls.legs.right.knee_pitch: 0.741, cls.legs.right.ankle_pitch: 0.5, diff --git a/sim/sim2sim.py b/sim/sim2sim.py index f4e1cc1a..1fa30ef9 100755 --- a/sim/sim2sim.py +++ b/sim/sim2sim.py @@ -316,7 +316,7 @@ def get_policy_output(policy, input_data): dt=0.001, decimation=10, cycle_time=0.4, - tau_factor=4., + tau_factor=4.0, ) run_mujoco(policy, cfg, args.keyboard_use) diff --git a/sim/utils/cmd_manager.py b/sim/utils/cmd_manager.py new file mode 100644 index 00000000..d223539d --- /dev/null +++ b/sim/utils/cmd_manager.py @@ -0,0 +1,146 @@ +from enum import Enum +from typing import List + +import numpy as np +import torch + +from sim.utils.helpers import draw_vector + + +class CommandMode(Enum): + FIXED = "fixed" + OSCILLATING = "oscillating" + KEYBOARD = "keyboard" + RANDOM = "random" + + +class CommandManager: + """Manages robot commands""" + + def __init__( + self, + num_envs: int = 1, + mode: str = "fixed", + default_cmd: List[float] = [0.3, 0.0, 0.0, 0.0], + device="cpu", + env_cfg=None, + ): + self.num_envs = num_envs + self.mode = CommandMode(mode) + self.device = device + self.default_cmd = torch.tensor(default_cmd, device=self.device) + self.commands = self.default_cmd.repeat(num_envs, 1) + self.time = 0 + self.env_cfg = env_cfg + + # Mode-specific parameters + if self.mode == CommandMode.OSCILLATING: + self.osc_period = 5.0 # secs + self.min_x_vel = env_cfg.commands.ranges.lin_vel_x[0] if env_cfg else 0.0 + self.max_x_vel = env_cfg.commands.ranges.lin_vel_x[1] if env_cfg else 0.3 + self.osc_amplitude = (self.max_x_vel - self.min_x_vel) / 2 + self.osc_offset = (self.max_x_vel + self.min_x_vel) / 2 + elif self.mode == CommandMode.RANDOM: + self.cmd_ranges = { + 'lin_vel_x': env_cfg.commands.ranges.lin_vel_x, + 'lin_vel_y': env_cfg.commands.ranges.lin_vel_y, + 'ang_vel_yaw': env_cfg.commands.ranges.ang_vel_yaw, + 'heading': env_cfg.commands.ranges.heading + } if env_cfg else { + 'lin_vel_x': [-0.05, 0.23], + 'lin_vel_y': [-0.05, 0.05], + 'ang_vel_yaw': [-0.5, 0.5], + 'heading': [-np.pi, np.pi] + } + self.resampling_time = env_cfg.commands.resampling_time if env_cfg else 8.0 + self.last_sample_time = 0.0 + elif self.mode == CommandMode.KEYBOARD: + try: + import pygame + pygame.init() + pygame.display.set_mode((100, 100)) + self.x_vel_cmd = 0.0 + self.y_vel_cmd = 0.0 + self.yaw_vel_cmd = 0.0 + except ImportError: + print("WARNING: pygame not found, falling back to fixed commands") + self.mode = CommandMode.FIXED + + def update(self, dt: float) -> torch.Tensor: + """Updates and returns commands based on current mode.""" + self.time += dt + + if self.mode == CommandMode.FIXED: + return self.commands + elif self.mode == CommandMode.OSCILLATING: + # Oscillate x velocity between min and max + x_vel = self.osc_offset + self.osc_amplitude * torch.sin( + torch.tensor(2 * np.pi * self.time / self.osc_period) + ) + self.commands[:, 0] = x_vel.to(self.device) + elif self.mode == CommandMode.RANDOM: + if self.time - self.last_sample_time >= self.resampling_time: + self.last_sample_time = self.time + # Generate random commands within training ranges + new_commands = torch.tensor([ + np.random.uniform(*self.cmd_ranges['lin_vel_x']), + np.random.uniform(*self.cmd_ranges['lin_vel_y']), + 0.0, + np.random.uniform(*self.cmd_ranges['heading']) + ], device=self.device) if self.env_cfg and self.env_cfg.commands.heading_command else torch.tensor([ + np.random.uniform(*self.cmd_ranges['lin_vel_x']), + np.random.uniform(*self.cmd_ranges['lin_vel_y']), + np.random.uniform(*self.cmd_ranges['ang_vel_yaw']), + 0.0 + ], device=self.device) + self.commands = new_commands.repeat(self.num_envs, 1) + elif self.mode == CommandMode.KEYBOARD: + self._handle_keyboard_input() + self.commands[:, 0] = torch.tensor(self.x_vel_cmd, device=self.device) + self.commands[:, 1] = torch.tensor(self.y_vel_cmd, device=self.device) + self.commands[:, 2] = torch.tensor(self.yaw_vel_cmd, device=self.device) + + return self.commands + + def draw(self, gym, viewer, env_handles, robot_positions, actual_vels) -> None: + """Draws command and actual velocity arrows for all robots.""" + if viewer is None: + return + + gym.clear_lines(viewer) + cmd_vels = self.commands[:, :2].cpu().numpy() + for env_handle, robot_pos, cmd_vel, actual_vel in zip(env_handles, robot_positions, cmd_vels, actual_vels): + draw_vector(gym, viewer, env_handle, robot_pos, cmd_vel, (0.0, 1.0, 0.0)) # cmd vector (green) + draw_vector(gym, viewer, env_handle, robot_pos, actual_vel, (1.0, 0.0, 0.0)) # vel vector (red) + + def _handle_keyboard_input(self): + """Handles keyboard input for command updates.""" + import pygame + + for event in pygame.event.get(): + if event.type == pygame.QUIT: + pygame.quit() + + keys = pygame.key.get_pressed() + + # Update movement commands based on arrow keys + if keys[pygame.K_UP]: + self.x_vel_cmd = min(self.x_vel_cmd + 0.0005, 0.5) + if keys[pygame.K_DOWN]: + self.x_vel_cmd = max(self.x_vel_cmd - 0.0005, -0.5) + if keys[pygame.K_LEFT]: + self.y_vel_cmd = min(self.y_vel_cmd + 0.0005, 0.5) + if keys[pygame.K_RIGHT]: + self.y_vel_cmd = max(self.y_vel_cmd - 0.0005, -0.5) + + # Yaw control + if keys[pygame.K_a]: + self.yaw_vel_cmd = min(self.yaw_vel_cmd + 0.001, 0.5) + if keys[pygame.K_z]: + self.yaw_vel_cmd = max(self.yaw_vel_cmd - 0.001, -0.5) + + # Reset commands + if keys[pygame.K_SPACE]: + self.x_vel_cmd = 0.0 + self.y_vel_cmd = 0.0 + self.yaw_vel_cmd = 0.0 diff --git a/sim/utils/helpers.py b/sim/utils/helpers.py index 675fe070..f1530163 100755 --- a/sim/utils/helpers.py +++ b/sim/utils/helpers.py @@ -34,6 +34,7 @@ import copy import os import random +from typing import Any, Tuple, Union import numpy as np @@ -232,6 +233,12 @@ def get_args() -> argparse.Namespace: "default": False, "help": "Enable HDF5 logging", }, + { + "name": "--command-arrow", + "action": "store_true", + "default": False, + "help": "Draw command and velocity arrows during visualization", + }, ] # parse arguments args = gymutil.parse_arguments(description="RL Policy", custom_parameters=custom_parameters) @@ -244,7 +251,7 @@ def get_args() -> argparse.Namespace: return args -def export_policy_as_jit(actor_critic, path) -> None: +def export_policy_as_jit(actor_critic: Any, path: Union[str, os.PathLike]) -> None: os.makedirs(path, exist_ok=True) path = os.path.join(path, "policy_1.pt") model = copy.deepcopy(actor_critic.actor).to("cpu") @@ -265,3 +272,79 @@ def export_policy_as_onnx(actor_critic, path): dummy_input = torch.randn(1, input_dim) torch.onnx.export(model, dummy_input, path) + + +def draw_vector( + gym: gymapi.Gym, + viewer: gymapi.Viewer, + env_handle: gymapi.Env, + start_pos: np.ndarray, + direction: Tuple[float, float], + color: Tuple[float, float, float], + clear_lines: bool = False, + head_scale: float = 0.1, +) -> None: + """Draws a single vector with an arrowhead.""" + if viewer is None: + return + + # Unpack direction and create start position + vel_x, vel_y = direction + start = gymapi.Vec3(start_pos[0], start_pos[1], start_pos[2]) + + # Scale arrow length by magnitude + vel_magnitude = np.sqrt(vel_x**2 + vel_y**2) + if vel_magnitude > 0: + arrow_scale = np.clip(vel_magnitude, 0.1, 1.0) + normalized_x = vel_x / vel_magnitude + normalized_y = vel_y / vel_magnitude + else: + arrow_scale = 0.1 + normalized_x = 0 + normalized_y = 0 + + # Calculate end position and arrowhead + end = gymapi.Vec3(start.x + normalized_x * arrow_scale, start.y + normalized_y * arrow_scale, start.z) + + # Calculate perpendicular vector for arrowhead + perp_x = -normalized_y + perp_y = normalized_x + + head_left = gymapi.Vec3( + end.x - head_scale * (normalized_x * 0.7 + perp_x * 0.7), + end.y - head_scale * (normalized_y * 0.7 + perp_y * 0.7), + end.z, + ) + + head_right = gymapi.Vec3( + end.x - head_scale * (normalized_x * 0.7 - perp_x * 0.7), + end.y - head_scale * (normalized_y * 0.7 - perp_y * 0.7), + end.z, + ) + + # Create vertices and colors + verts = [ + start.x, + start.y, + start.z, + end.x, + end.y, + end.z, + end.x, + end.y, + end.z, + head_left.x, + head_left.y, + head_left.z, + end.x, + end.y, + end.z, + head_right.x, + head_right.y, + head_right.z, + ] + colors = [color[0], color[1], color[2]] * 6 + + if clear_lines: + gym.clear_lines(viewer) + gym.add_lines(viewer, env_handle, 3, verts, colors)