-
Notifications
You must be signed in to change notification settings - Fork 308
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit fixes the gripper issues for the sawyer mujoco model with a model that is more inspired by gym fetch_env. The SawyerEnv class is also updated so that all sawyer environments just need to specify a start configuration, a goal configuration, a reward function and a success function. A wrapper of goal env is added so that goal environments can be used with garage algorithms like ppo, trpo and ddpg
- Loading branch information
1 parent
8157b57
commit a124edc
Showing
8 changed files
with
506 additions
and
470 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,235 +1,75 @@ | ||
from gym.envs.robotics import rotations | ||
from gym.envs.robotics.utils import ctrl_set_action, mocap_set_action | ||
from gym.spaces import Box | ||
import numpy as np | ||
|
||
from garage.core import Serializable | ||
from garage.envs import Step | ||
from garage.envs.mujoco.sawyer.sawyer_env import SawyerEnv | ||
from garage.envs.mujoco.sawyer.sawyer_env import Configuration, SawyerEnv | ||
from garage.misc.overrides import overrides | ||
|
||
|
||
class PickAndPlaceEnv(SawyerEnv, Serializable): | ||
|
||
FILE = 'pick_and_place.xml' | ||
|
||
def __init__(self, | ||
initial_goal=None, | ||
initial_qpos=None, | ||
distance_threshold=0.05, | ||
target_range=0.15, | ||
sparse_reward=False, | ||
control_method='position_control', | ||
*args, | ||
**kwargs): | ||
Serializable.__init__(self, *args, **kwargs) | ||
if initial_goal is None: | ||
initial_goal = np.array([0.8, 0.0, 0.]) | ||
if initial_qpos is None: | ||
initial_qpos = { | ||
'right_j0': -0.140923828125, | ||
'right_j1': -1.2789248046875, | ||
'right_j2': -3.043166015625, | ||
'right_j3': -2.139623046875, | ||
'right_j4': -0.047607421875, | ||
'right_j5': -0.7052822265625, | ||
'right_j6': -1.4102060546875, | ||
} | ||
self._distance_threshold = distance_threshold | ||
self._target_range = target_range | ||
self._sparse_reward = sparse_reward | ||
self._control_method = control_method | ||
self._grasped = False | ||
SawyerEnv.__init__( | ||
self, | ||
initial_goal=initial_goal, | ||
initial_qpos=initial_qpos, | ||
target_range=target_range, | ||
*args, | ||
class PickAndPlaceEnv(SawyerEnv): | ||
def __init__(self, **kwargs): | ||
def start_goal_config(): | ||
center = self.sim.data.get_geom_xpos('target2') | ||
start = Configuration( | ||
gripper_pos=np.concatenate([center[:2], [0.35]]), | ||
gripper_state=1, | ||
object_grasped=False, | ||
object_pos=np.concatenate([center[:2], [0.03]])) | ||
goal = Configuration( | ||
gripper_pos=np.concatenate([center[:2], [0.105]]), | ||
gripper_state=0, | ||
object_grasped=True, | ||
object_pos=np.concatenate([center[:2], [0.1]])) | ||
return start, goal | ||
|
||
def success_fn(env: SawyerEnv, _achieved_goal, _desired_goal, | ||
_info: dict): | ||
return env.has_object and env.object_position[2] >= self._goal_configuration.object_pos[2] # noqa: E501 | ||
|
||
super(PickAndPlaceEnv, self).__init__( | ||
start_goal_config=start_goal_config, | ||
success_fn=success_fn, | ||
**kwargs) | ||
|
||
@overrides | ||
def step(self, action: np.ndarray): | ||
action = np.clip(action, self.action_space.low, self.action_space.high) | ||
if self._control_method == 'torque_control': | ||
self.forward_dynamics(action) | ||
elif self._control_method == 'position_control': | ||
assert action.shape == (4, ) | ||
action = action.copy() | ||
pos_ctrl, gripper_ctrl = action[:3], action[3] | ||
pos_ctrl *= 0.1 # limit the action | ||
rot_ctrl = np.array([0., 1., 1., 0.]) | ||
gripper_ctrl = -50 if gripper_ctrl < 0 else 50 | ||
gripper_ctrl = np.array([gripper_ctrl, -gripper_ctrl]) | ||
action = np.concatenate([pos_ctrl, rot_ctrl, gripper_ctrl]) | ||
ctrl_set_action(self.sim, action) # For gripper | ||
mocap_set_action(self.sim, | ||
action) # For pos control of the end effector | ||
self.sim.step() | ||
|
||
obs = self.get_current_obs() | ||
next_obs = obs['observation'] | ||
achieved_goal = obs['achieved_goal'] | ||
goal = obs['desired_goal'] | ||
gripper_pos = obs['gripper_pos'] | ||
reward = self.compute_reward( | ||
achieved_goal, goal, dict(gripper_pos=gripper_pos)) | ||
collided = self._is_collided() | ||
if collided: | ||
reward -= 5 | ||
done = (self._goal_distance(achieved_goal, goal) < | ||
self._distance_threshold) | ||
|
||
return Step(next_obs, reward, done) | ||
|
||
def compute_reward(self, achieved_goal, goal, info): | ||
# Compute distance between goal and the achieved goal. | ||
grasped = self._grasp() | ||
reward = 0 | ||
gripper_pos = info['gripper_pos'] | ||
if not grasped: | ||
# first phase: move towards the object | ||
d = self._goal_distance(gripper_pos, achieved_goal) | ||
else: | ||
d = self._goal_distance(achieved_goal, goal) | ||
reward += 30 | ||
self._grasped = True | ||
|
||
if self._sparse_reward: | ||
reward += -(d > self._distance_threshold).astype(np.float32) | ||
else: | ||
reward += -d | ||
if grasped and d < self._distance_threshold: | ||
reward += 4200 | ||
return reward | ||
|
||
def _grasp(self): | ||
contacts = tuple() | ||
for coni in range(self.sim.data.ncon): | ||
con = self.sim.data.contact[coni] | ||
contacts += ((con.geom1, con.geom2), ) | ||
|
||
finger_id_1 = self.sim.model.geom_name2id('finger_tip_1') | ||
finger_id_2 = self.sim.model.geom_name2id('finger_tip_2') | ||
object_id = self.sim.model.geom_name2id('object0') | ||
if ((finger_id_1, object_id) in contacts or | ||
(object_id, finger_id_1) in contacts) and ( | ||
(finger_id_2, object_id) in contacts or | ||
(finger_id_2, object_id) in contacts): | ||
return True | ||
else: | ||
return False | ||
|
||
def sample_goal(self): | ||
""" | ||
Sample goals | ||
:return: the new sampled goal | ||
""" | ||
goal = self._initial_goal.copy() | ||
|
||
random_goal_delta = np.random.uniform( | ||
-self._target_range, self._target_range, size=2) | ||
goal[:2] += random_goal_delta | ||
self._goal = goal | ||
return goal | ||
|
||
@overrides | ||
@property | ||
def observation_space(self): | ||
""" | ||
Returns a Space object | ||
""" | ||
return Box( | ||
-np.inf, | ||
np.inf, | ||
shape=self.get_current_obs()['observation'].shape, | ||
dtype=np.float32) | ||
|
||
@overrides | ||
def get_current_obs(self): | ||
grip_pos = self.sim.data.get_site_xpos('grip') | ||
def get_obs(self): | ||
gripper_pos = self.gripper_position | ||
dt = self.sim.nsubsteps * self.sim.model.opt.timestep | ||
grip_velp = self.sim.data.get_site_xvelp('grip') * dt | ||
|
||
qpos = self.sim.data.qpos | ||
qvel = self.sim.data.qvel | ||
|
||
object_pos = self.sim.data.get_geom_xpos('object0') | ||
object_rot = rotations.mat2euler( | ||
self.sim.data.get_geom_xmat('object0')) | ||
object_velp = self.sim.data.get_geom_xvelp('object0') * dt | ||
object_velr = self.sim.data.get_geom_xvelr('object0') * dt | ||
object_rel_pos = object_pos - grip_pos | ||
object_pos = self.object_position | ||
object_velp = self.sim.data.get_site_xvelp('object0') * dt | ||
object_velp -= grip_velp | ||
grasped = self.has_object | ||
obs = np.concatenate([gripper_pos, object_pos]) | ||
|
||
achieved_goal = self._achieved_goal_fn(self) | ||
desired_goal = self._desired_goal_fn(self) | ||
|
||
achieved_goal = np.squeeze(object_pos.copy()) | ||
if self._control_method == 'position_control': | ||
obs = np.concatenate([ | ||
grip_pos, | ||
object_pos.ravel(), | ||
object_rel_pos.ravel(), | ||
object_rot.ravel(), | ||
object_velp.ravel(), | ||
object_velr.ravel(), | ||
grip_velp, | ||
]) | ||
elif self._control_method == 'torque_control': | ||
obs = np.concatenate([ | ||
object_pos.ravel(), | ||
object_rel_pos.ravel(), | ||
object_rot.ravel(), | ||
object_velp.ravel(), | ||
object_velr.ravel(), | ||
qpos, | ||
qvel, | ||
]) | ||
else: | ||
raise NotImplementedError | ||
achieved_goal_qpos = np.concatenate((achieved_goal, [1, 0, 0, 0])) | ||
self.sim.data.set_joint_qpos('achieved_goal:joint', achieved_goal_qpos) | ||
desired_goal_qpos = np.concatenate((desired_goal, [1, 0, 0, 0])) | ||
self.sim.data.set_joint_qpos('desired_goal:joint', desired_goal_qpos) | ||
|
||
return { | ||
'observation': obs.copy(), | ||
'achieved_goal': achieved_goal.copy(), | ||
'desired_goal': self._goal, | ||
'gripper_pos': grip_pos, | ||
'achieved_goal': achieved_goal, | ||
'desired_goal': desired_goal, | ||
'gripper_state': self.gripper_state, | ||
'gripper_pos': gripper_pos.copy(), | ||
'has_object': grasped, | ||
'object_pos': object_pos.copy() | ||
} | ||
|
||
@staticmethod | ||
def _goal_distance(goal_a, goal_b): | ||
assert goal_a.shape == goal_b.shape | ||
return np.linalg.norm(goal_a - goal_b, axis=-1) | ||
|
||
@overrides | ||
@property | ||
def action_space(self): | ||
if self._control_method == 'torque_control': | ||
return super(PickAndPlaceEnv, self).action_space() | ||
elif self._control_method == 'position_control': | ||
return Box( | ||
np.array([-0.1, -0.1, -0.1, -100]), | ||
np.array([0.1, 0.1, 0.1, 100]), | ||
dtype=np.float32) | ||
else: | ||
raise NotImplementedError | ||
|
||
def _reset_target_visualization(self): | ||
site_id = self.sim.model.site_name2id('target_pos') | ||
self.sim.model.site_pos[site_id] = self._goal | ||
self.sim.forward() | ||
|
||
@overrides | ||
def reset(self, init_state=None): | ||
self._grasped = False | ||
self._reset_target_visualization() | ||
return super(PickAndPlaceEnv, self).reset(init_state)['observation'] | ||
return Box( | ||
np.array([-0.1, -0.1, -0.1, -1.]), | ||
np.array([0.1, 0.1, 0.1, 1.]), | ||
dtype=np.float32) | ||
|
||
def _is_collided(self): | ||
"""Detect collision""" | ||
d = self.sim.data | ||
table_id = self.sim.model.geom_name2id('table') | ||
def compute_reward(self, achieved_goal, desired_goal, info: dict): | ||
d = np.linalg.norm(achieved_goal - desired_goal, axis=-1) | ||
if self._reward_type == 'sparse': | ||
return (d < self._distance_threshold).astype(np.float32) | ||
|
||
for i in range(d.ncon): | ||
con = d.contact[i] | ||
if table_id == con.geom1 and con.geom2 != 40: | ||
return True | ||
if table_id == con.geom2 and con.geom1 != 40: | ||
return True | ||
return False | ||
return -d |
Oops, something went wrong.