Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sim/envs/humanoids/stompymicro_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Defines the environment configuration for the Getting up task"""

import numpy as np

from sim.env import robot_urdf_path
from sim.envs.base.legged_robot_config import ( # type: ignore
LeggedRobotCfg,
Expand Down Expand Up @@ -148,7 +150,7 @@ class ranges:
lin_vel_x = [-0.05, 0.23] # min max [m/s]
lin_vel_y = [-0.04, 0.04] # min max [m/s]
ang_vel_yaw = [-0.1, 0.1] # min max [rad/s]
heading = [-3.14, 3.14]
heading = [-np.pi, np.pi] # min max [rad]

class rewards:
base_height_target = Robot.height
Expand Down
71 changes: 30 additions & 41 deletions sim/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,27 @@
# mypy: ignore-errors

import argparse
import copy
import logging
import os
from datetime import datetime
from typing import Any, Union

import cv2
import h5py
import numpy as np
from isaacgym import gymapi
from tqdm import tqdm

logger = logging.getLogger(__name__)

from sim.env import run_dir # noqa: E402
from sim.envs import task_registry # noqa: E402
from sim.utils.helpers import get_args # noqa: E402
from sim.utils.cmd_manager import CommandManager # noqa: E402
from sim.utils.helpers import ( # noqa: E402
export_policy_as_jit,
export_policy_as_onnx,
get_args,
)
from sim.utils.logger import Logger # noqa: E402

import torch # isort: skip


def export_policy_as_jit(actor_critic: Any, path: Union[str, os.PathLike]) -> None:
os.makedirs(path, exist_ok=True)
path = os.path.join(path, "policy_1.pt")
model = copy.deepcopy(actor_critic.actor).to("cpu")
traced_script_module = torch.jit.script(model)
traced_script_module.save(path)


def export_policy_as_onnx(actor_critic, path):
os.makedirs(path, exist_ok=True)
path = os.path.join(path, "policy_1.onnx")
model = copy.deepcopy(actor_critic.actor).to("cpu")

# Get the input dimension from the first layer of the model
first_layer = next(model.parameters())
input_dim = first_layer.shape[1]

# Create a dummy input tensor with the correct dimensions
dummy_input = torch.randn(1, input_dim)

torch.onnx.export(model, dummy_input, path)
logger = logging.getLogger(__name__)


def play(args: argparse.Namespace) -> None:
Expand Down Expand Up @@ -86,7 +64,6 @@ def play(args: argparse.Namespace) -> None:
policy = ppo_runner.get_inference_policy(device=env.device)

# Export policy if needed
EXPORT_POLICY = True
if EXPORT_POLICY:
path = os.path.join(".")
export_policy_as_jit(ppo_runner.alg.actor_critic, path)
Expand All @@ -104,6 +81,7 @@ def play(args: argparse.Namespace) -> None:
joint_index = 1
stop_state_log = 1000
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

if args.log_h5:
h5_file = h5py.File(f"data{now}.h5", "w")

Expand Down Expand Up @@ -136,7 +114,7 @@ def play(args: argparse.Namespace) -> None:
"observations/euler", (max_timesteps, buf_len, 3), dtype=np.float32
) # root orientation

if RENDER:
if not args.headless:
camera_properties = gymapi.CameraProperties()
camera_properties.width = 1920
camera_properties.height = 1080
Expand Down Expand Up @@ -165,26 +143,36 @@ def play(args: argparse.Namespace) -> None:
os.mkdir(experiment_dir)
video = cv2.VideoWriter(dir, fourcc, 50.0, (1920, 1080))

cmd_manager = CommandManager(
num_envs=env_cfg.env.num_envs,
mode=CMD_MODE,
default_cmd=DEFAULT_COMMAND,
device=env.device,
env_cfg=env_cfg
)

for t in tqdm(range(stop_state_log)):
actions = policy(obs.detach())
if args.log_h5:
dset_actions[t] = actions.detach().numpy()

if FIX_COMMAND:
env.commands[:, 0] = 0.2
env.commands[:, 1] = 0.0
env.commands[:, 2] = 0.0
env.commands[:, 3] = 0.0
env.commands[:] = cmd_manager.update(env.dt)

obs, critic_obs, rews, dones, infos = env.step(actions.detach())
print(f"IMU: {obs[0, (3 * env.num_actions + 5) + 3 : (3 * env.num_actions + 5) + 2 * 3]}")

if RENDER:
if not args.headless:
env.gym.fetch_results(env.sim, True)
env.gym.step_graphics(env.sim)
env.gym.render_all_camera_sensors(env.sim)
img = env.gym.get_camera_image(env.sim, env.envs[0], h1, gymapi.IMAGE_COLOR)
img = np.reshape(img, (1080, 1920, 4))
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
robot_positions = env.root_states[:, 0:3].cpu().numpy()
actual_vels = np.stack([env.base_lin_vel[:, 0].cpu().numpy(), env.base_lin_vel[:, 1].cpu().numpy()], axis=1)

if args.command_arrow:
cmd_manager.draw(env.gym, env.viewer, env.envs, robot_positions, actual_vels)

video.write(img[..., :3])

Expand Down Expand Up @@ -237,7 +225,7 @@ def play(args: argparse.Namespace) -> None:
env_logger.print_rewards()
env_logger.plot_states()

if RENDER:
if not args.headless:
video.release()

if args.log_h5:
Expand All @@ -246,11 +234,12 @@ def play(args: argparse.Namespace) -> None:


if __name__ == "__main__":
RENDER = True
FIX_COMMAND = True

EXPORT_POLICY = True
EXPORT_ONNX = True

DEFAULT_COMMAND = [0.3, 0.0, 0.0, 0.0]
CMD_MODE = "random" # options: "fixed", "oscillating", "random", "keyboard"

base_args = get_args()
parser = argparse.ArgumentParser(description="Extend base arguments with log_h5")
parser.add_argument("--log_h5", action="store_true", help="Enable HDF5 logging")
Expand Down
2 changes: 1 addition & 1 deletion sim/resources/stompymicro/joints.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def default_standing(cls) -> Dict[str, float]:
cls.legs.left.knee_pitch: -0.741,
cls.legs.left.hip_yaw: 0,
cls.legs.left.hip_roll: 0,
cls.legs.left.ankle_pitch: -0.5,
cls.legs.left.ankle_pitch: -0.5,
cls.legs.right.hip_pitch: -0.23,
cls.legs.right.knee_pitch: 0.741,
cls.legs.right.ankle_pitch: 0.5,
Expand Down
2 changes: 1 addition & 1 deletion sim/sim2sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def get_policy_output(policy, input_data):
dt=0.001,
decimation=10,
cycle_time=0.4,
tau_factor=4.,
tau_factor=4.0,
)

run_mujoco(policy, cfg, args.keyboard_use)
146 changes: 146 additions & 0 deletions sim/utils/cmd_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from enum import Enum
from typing import List

import numpy as np
import torch

from sim.utils.helpers import draw_vector


class CommandMode(Enum):
FIXED = "fixed"
OSCILLATING = "oscillating"
KEYBOARD = "keyboard"
RANDOM = "random"


class CommandManager:
"""Manages robot commands"""

def __init__(
self,
num_envs: int = 1,
mode: str = "fixed",
default_cmd: List[float] = [0.3, 0.0, 0.0, 0.0],
device="cpu",
env_cfg=None,
):
self.num_envs = num_envs
self.mode = CommandMode(mode)
self.device = device
self.default_cmd = torch.tensor(default_cmd, device=self.device)
self.commands = self.default_cmd.repeat(num_envs, 1)
self.time = 0
self.env_cfg = env_cfg

# Mode-specific parameters
if self.mode == CommandMode.OSCILLATING:
self.osc_period = 5.0 # secs
self.min_x_vel = env_cfg.commands.ranges.lin_vel_x[0] if env_cfg else 0.0
self.max_x_vel = env_cfg.commands.ranges.lin_vel_x[1] if env_cfg else 0.3
self.osc_amplitude = (self.max_x_vel - self.min_x_vel) / 2
self.osc_offset = (self.max_x_vel + self.min_x_vel) / 2
elif self.mode == CommandMode.RANDOM:
self.cmd_ranges = {
'lin_vel_x': env_cfg.commands.ranges.lin_vel_x,
'lin_vel_y': env_cfg.commands.ranges.lin_vel_y,
'ang_vel_yaw': env_cfg.commands.ranges.ang_vel_yaw,
'heading': env_cfg.commands.ranges.heading
} if env_cfg else {
'lin_vel_x': [-0.05, 0.23],
'lin_vel_y': [-0.05, 0.05],
'ang_vel_yaw': [-0.5, 0.5],
'heading': [-np.pi, np.pi]
}
self.resampling_time = env_cfg.commands.resampling_time if env_cfg else 8.0
self.last_sample_time = 0.0
elif self.mode == CommandMode.KEYBOARD:
try:
import pygame
pygame.init()
pygame.display.set_mode((100, 100))
self.x_vel_cmd = 0.0
self.y_vel_cmd = 0.0
self.yaw_vel_cmd = 0.0
except ImportError:
print("WARNING: pygame not found, falling back to fixed commands")
self.mode = CommandMode.FIXED

def update(self, dt: float) -> torch.Tensor:
"""Updates and returns commands based on current mode."""
self.time += dt

if self.mode == CommandMode.FIXED:
return self.commands
elif self.mode == CommandMode.OSCILLATING:
# Oscillate x velocity between min and max
x_vel = self.osc_offset + self.osc_amplitude * torch.sin(
torch.tensor(2 * np.pi * self.time / self.osc_period)
)
self.commands[:, 0] = x_vel.to(self.device)
elif self.mode == CommandMode.RANDOM:
if self.time - self.last_sample_time >= self.resampling_time:
self.last_sample_time = self.time
# Generate random commands within training ranges
new_commands = torch.tensor([
np.random.uniform(*self.cmd_ranges['lin_vel_x']),
np.random.uniform(*self.cmd_ranges['lin_vel_y']),
0.0,
np.random.uniform(*self.cmd_ranges['heading'])
], device=self.device) if self.env_cfg and self.env_cfg.commands.heading_command else torch.tensor([
np.random.uniform(*self.cmd_ranges['lin_vel_x']),
np.random.uniform(*self.cmd_ranges['lin_vel_y']),
np.random.uniform(*self.cmd_ranges['ang_vel_yaw']),
0.0
], device=self.device)
self.commands = new_commands.repeat(self.num_envs, 1)
elif self.mode == CommandMode.KEYBOARD:
self._handle_keyboard_input()
self.commands[:, 0] = torch.tensor(self.x_vel_cmd, device=self.device)
self.commands[:, 1] = torch.tensor(self.y_vel_cmd, device=self.device)
self.commands[:, 2] = torch.tensor(self.yaw_vel_cmd, device=self.device)

return self.commands

def draw(self, gym, viewer, env_handles, robot_positions, actual_vels) -> None:
"""Draws command and actual velocity arrows for all robots."""
if viewer is None:
return

gym.clear_lines(viewer)
cmd_vels = self.commands[:, :2].cpu().numpy()
for env_handle, robot_pos, cmd_vel, actual_vel in zip(env_handles, robot_positions, cmd_vels, actual_vels):
draw_vector(gym, viewer, env_handle, robot_pos, cmd_vel, (0.0, 1.0, 0.0)) # cmd vector (green)
draw_vector(gym, viewer, env_handle, robot_pos, actual_vel, (1.0, 0.0, 0.0)) # vel vector (red)

def _handle_keyboard_input(self):
"""Handles keyboard input for command updates."""
import pygame

for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()

keys = pygame.key.get_pressed()

# Update movement commands based on arrow keys
if keys[pygame.K_UP]:
self.x_vel_cmd = min(self.x_vel_cmd + 0.0005, 0.5)
if keys[pygame.K_DOWN]:
self.x_vel_cmd = max(self.x_vel_cmd - 0.0005, -0.5)
if keys[pygame.K_LEFT]:
self.y_vel_cmd = min(self.y_vel_cmd + 0.0005, 0.5)
if keys[pygame.K_RIGHT]:
self.y_vel_cmd = max(self.y_vel_cmd - 0.0005, -0.5)

# Yaw control
if keys[pygame.K_a]:
self.yaw_vel_cmd = min(self.yaw_vel_cmd + 0.001, 0.5)
if keys[pygame.K_z]:
self.yaw_vel_cmd = max(self.yaw_vel_cmd - 0.001, -0.5)

# Reset commands
if keys[pygame.K_SPACE]:
self.x_vel_cmd = 0.0
self.y_vel_cmd = 0.0
self.yaw_vel_cmd = 0.0
Loading