In [None]:
# 1. Install all system dependencies (with -y to auto-confirm)
!apt-get update
!apt-get install -y xvfb x11-xserver-utils
!apt-get install -y \
    libx11-6 \
    libxau6 \
    libxdmcp6 \
    libxcb1 \
    libxext6 \
    libx11-xcb1 \
    libvulkan1 \
    vulkan-utils \
    libvulkan-dev \
    mesa-vulkan-drivers

# 2. Install Python packages
!pip install --upgrade mani_skill tyro pyvirtualdisplay

# 3. Verify Xvfb is installed
!which Xvfb

# 4. Setup virtual display
from pyvirtualdisplay import Display
virtual_display = Display(visible=0, size=(1024, 768))
virtual_display.start()

# Other added
!pip install ipywidgets
!jupyter nbextension enable --py widgetsnbextension

In [2]:
from typing import Any, Dict, Union

import numpy as np
import sapien
import torch
import torch.random
from transforms3d.euler import euler2quat

from mani_skill.agents.robots import Fetch, Panda
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import common, sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs import Pose
from mani_skill.utils.structs.types import Array, GPUMemoryConfig, SimConfig

  warn("Failed to find system libvulkan. Fallback to SAPIEN builtin libvulkan.")
  warn(


In [19]:
from typing import Any, Dict


from torch import Tensor


@register_env("PushCube-v1", max_episode_steps=50)
class PushCubeEnv(BaseEnv):

    SUPPORTED_ROBOTS = ["panda","fetch"]

    agent: Union[Panda,Fetch]
    goal_radius = 0.1
    cube_half_size = 0.02

    def __init__(self,*args,robot_uids="panda",robot_init_qpos_noise=0.02,**kwargs):
        self.robot_init_qpos_noise = robot_init_qpos_noise
        super().__init__(*args,robot_uids=robot_uids,**kwargs) 

    def _load_scene(self,options:dict):
        self.table_scene = TableSceneBuilder(
            env=self,robot_init_qpos_noise=self.robot_init_qpos_noise
        )
        self.table_scene.build() # what does this line do?


        self.obj = actors.build_cube(
            self.scene,
            half_size = self.cube_half_size,
            color = np.array([12,42,160,255])/255,
            name="cube",
            body_type="dynamic",
            initial_pose=sapien.Pose(p=[0,0,self.cube_half_size])
        )

        self.goal_region = actors.build_red_white_target(
            self.scene,
            radius = self.goal_radius,
            thickness = 1e-5,
            name="goal_region",
            add_collision=False,
            body_type="kinematic",
            initial_pose=sapien.Pose(p=[0,0,1e-3])
        )

    def _load_agent(self,options:dict):
        super()._load_agent(options,sapien.Pose(p=[-0.615,0,0]))

    def _initialize_episode(self,env_idx:torch.Tensor,options:dict):
        with torch.device(self.device):
            # why would you only want to reinitialize some of the envs
            b = len(env_idx)
            self.table_scene.initialize(env_idx)

            # xy coordinates are randomized and z is set to half_size
            xyz = torch.zeros((b,3))
            xyz[...,:2] = torch.rand((b,2)) * 0.2 - 0.1
            xyz[...,2] = self.cube_half_size
            q = [1,0,0,0]

            obj_pose = Pose.create_from_pq(p=xyz,q=q)
            self.obj.set_pose(obj_pose)

            # some weird math positioning
            # need to read up on why quarternions are better for representing rotations
            target_region_xyz = xyz + torch.tensor([0.1 + self.goal_radius,0,0])
            target_region_xyz[...,2] = 1e-3
            self.goal_region.set_pose(
                Pose.create_from_pq(
                    p=target_region_xyz,
                    q=euler2quat(0,np.pi/2,0)
                )
            )


    def evaluate(self):
        is_obj_placed = (
            torch.linalg.norm(
                self.obj.pose.p[...,:2] - self.goal_region.pose.p[...,:2],axis=1
            ) < self.goal_radius
        )

        # implementing fail condition
        is_failed = (
            self.obj.pose.p[...,2]<1
        )
    

        return {
            "success": is_obj_placed,
            "fail": is_failed,
            }

    def _get_obs_extra(self,info:Dict):
        obs = dict(
            tcp_pose=self.agent.tcp.pose.raw_pose,
        )
        if self.obs_mode in ["state","state_dict"]:
            obs.update(
                goal_pos = self.goal_region.pose.p,
                obj_pose = self.obj.pose.raw_pose,
            )
        return obs

    def compute_dense_reward(self, obs: Any, action: Array, info: Dict):
        # We also create a pose marking where the robot should push the cube from that is easiest (pushing from behind the cube)
        tcp_push_pose = Pose.create_from_pq(
            p=self.obj.pose.p
            + torch.tensor([-self.cube_half_size - 0.005, 0, 0], device=self.device)
        )
        tcp_to_push_pose = tcp_push_pose.p - self.agent.tcp.pose.p
        tcp_to_push_pose_dist = torch.linalg.norm(tcp_to_push_pose, axis=1)
        reaching_reward = 1 - torch.tanh(5 * tcp_to_push_pose_dist)
        reward = reaching_reward

        # compute a placement reward to encourage robot to move the cube to the center of the goal region
        # we further multiply the place_reward by a mask reached so we only add the place reward if the robot has reached the desired push pose
        # This reward design helps train RL agents faster by staging the reward out.
        reached = tcp_to_push_pose_dist < 0.01
        obj_to_goal_dist = torch.linalg.norm(
            self.obj.pose.p[..., :2] - self.goal_region.pose.p[..., :2], axis=1
        )
        place_reward = 1 - torch.tanh(5 * obj_to_goal_dist)
        reward += place_reward * reached

        # assign rewards to parallel environments that achieved success to the maximum of 3.
        reward[info["success"]] = 3
        return reward

    def compute_normalized_dense_reward(self, obs: Any, action: Array, info: Dict):
        # this should be equal to compute_dense_reward / max possible reward
        max_reward = 3.0
        return self.compute_dense_reward(obs=obs, action=action, info=info) / max_reward



In [22]:
from pyvirtualdisplay import Display
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

env = PushCubeEnv()
env.reset()
env.render_mode = "rgb_array"


for episode in range(3):
    obs = env.reset()
    ep_reward = 0
    
    for step in range(50):
        action = env.action_space.sample()
        obs,reward,terminated,truncated,info = env.step(action)
        ep_reward += reward
        
        env.render()
        
        if terminated:
            print(f"Episode {episode + 1} finished with reward: {ep_reward}")
            print(f"Success: {info['success']}, Failed: {info['fail']}")
            break
env.close()

Episode 1 finished with reward: tensor([0.1192])
Success: tensor([False]), Failed: tensor([True])
Episode 2 finished with reward: tensor([0.0981])
Success: tensor([False]), Failed: tensor([True])
Episode 3 finished with reward: tensor([0.1181])
Success: tensor([False]), Failed: tensor([True])
