In [1]:
#import isaacgymenvs
import os
os.environ['MUJOCO_GL'] = 'egl'
#import isaacgym
import gym
import numpy as np
from copy import deepcopy
from hydra import compose, initialize_config_dir
from pathlib import Path
from robomimic.utils.vis_utils import make_model_img_feature_plot
from omegaconf import OmegaConf
import torch
import json
import h5py
import hydra
import robomimic
import robomimic.utils.file_utils as FileUtils
from robomimic.utils.file_utils import maybe_dict_from_checkpoint, config_from_checkpoint
import robomimic.utils.torch_utils as TorchUtils
import robomimic.utils.tensor_utils as TensorUtils
import robomimic.utils.env_utils as EnvUtils
from robomimic.algo import RolloutPolicy
import myosuite
from pprint import pprint

    No private macro file found!
    It is recommended to use a private macro file
    To setup, run: python /home/bsud/multi_task_experts/robomimic/robomimic/scripts/setup_macros.py
)[0m
MyoSuite:> Registering Myo Envs


In [2]:
def rollout(policy, env, horizon, render=False, video_writer=None, video_skip=5, camera_names=None):
    """
    Helper function to carry out rollouts. Supports on-screen rendering, off-screen rendering to a video, 
    and returns the rollout trajectory.
    Args:
        policy (instance of RolloutPolicy): policy loaded from a checkpoint
        env (instance of EnvBase): env loaded from a checkpoint or demonstration metadata
        horizon (int): maximum horizon for the rollout
        render (bool): whether to render rollout on-screen
        video_writer (imageio writer): if provided, use to write rollout to video
        video_skip (int): how often to write video frames
        camera_names (list): determines which camera(s) are used for rendering. Pass more than
            one to output a video with multiple camera views concatenated horizontally.
    Returns:
        stats (dict): some statistics for the rollout - such as return, horizon, and task success
    """
    # assert isinstance(env, EnvBase)
    assert isinstance(policy, RolloutPolicy)
    assert not (render and (video_writer is not None))

    policy.start_episode()
    obs = env.reset()
    obs_keys = set(policy.policy.nets.policy.obs_shapes.keys())
    # state_dict = env.get_state()

    # hack that is necessary for robosuite tasks for deterministic action playback
    # obs = env.reset_to(state_dict)

    results = {}
    video_count = 0  # video frame counter
    total_reward = 0.
    try:
        for step_i in range(horizon):
            obs_dict = {}
            pprint(vars(env))
            print(env._current_obs)
            if hasattr(env, 'task'):
                env_obs_dict = env.task.obs_dict
            else:
                env_obs_dict = env.obs_dict
            for k in obs_keys:
                if "camera" in k and env_obs_dict[k].shape[3] == 3:
                    obs_dict[k] = env_obs_dict[k].permute(0, 3, 1, 2)
                    print(obs_dict[k].shape)
                else:
                    obs_dict[k] = env_obs_dict[k]

            # get action from policy
            act = policy(ob=obs_dict, batched=True)

            # play action
            next_obs, r, done, info = env.step(torch.tensor(act, device=env.device, dtype=torch.float))

            # compute reward
            total_reward += r
            success_key = "success" if "success" in info else "successes"
            success = info[success_key].cpu().numpy()

            # visualization
            if render:
                env.render(mode="human", camera_name=camera_names[0])
            if video_writer is not None:
                if video_count % video_skip == 0:
                    video_img = []
                    for cam_name in camera_names:
                        video_img.append(env.obs_dict[cam_name].cpu().numpy()[0])
                    video_img = np.concatenate(video_img, axis=1) # concatenate horizontally
                    video_writer.append_data(video_img)
                video_count += 1

            # break if done or if success
            if done.any() or success.any():
                break

            # update for next iter
            obs = deepcopy(next_obs)
            state_dict = env.get_state()

    except env.rollout_exceptions as e:
        print("WARNING: got rollout exception {}".format(e))

    stats = dict(Return=total_reward, Horizon=(step_i + 1), Success_Rate=float(success))

    return stats

In [3]:
def eval_myo(ckpt_path):

    device = TorchUtils.get_torch_device(try_to_use_cuda=True)
    
    policy, ckpt_dict = FileUtils.policy_from_checkpoint(
        ckpt_path=ckpt_path, 
        device=device, 
        verbose=True
    )
     
    config, _ = config_from_checkpoint(algo_name=ckpt_dict["algo_name"], ckpt_dict=ckpt_dict, verbose=False)
    env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path=config.train.data[0]["path"])
    env_meta['env_kwargs']['pad_to_shape'] = tuple(env_meta['env_kwargs']['pad_to_shape'])
    shape_meta = FileUtils.get_shape_metadata_from_dataset(
        dataset_path=config.train.data[0]["path"],
        action_keys=config.train.action_keys,
        all_obs_keys=config.all_obs_keys,
        verbose=True
    )
    env = EnvUtils.create_env_from_metadata(
        env_meta=env_meta, 
        render=False, 
        render_offscreen=True,
        use_image_obs=shape_meta["use_images"],
    )
    env = EnvUtils.wrap_env_from_config(env, config=config)
    #env = MyoSuiteCameraWrapper(env)
    rollout_horizon = 150
    np.random.seed(0)
    torch.manual_seed(0)
    video_path = "rollout.mp4"
    # video_writer = imageio.get_writer(video_path, fps=20)

    stats = rollout(
        policy=policy, 
        env=env, 
        horizon=rollout_horizon, 
        render=True, 
        # video_writer=video_writer, 
        # video_skip=5, 
        camera_names=["hand_camera"]
    )
    print(stats)
    # video_writer.close()

In [4]:
ckpt_path = "/home/bsud/multi_task_experts/robomimic/bc_trained_models/test/20240622210218/models/model_epoch_500.pth"

In [5]:
eval_myo(ckpt_path)

{
    "algo_name": "bc",
    "experiment": {
        "name": "test",
        "validate": false,
        "logging": {
            "terminal_output_to_txt": true,
            "log_tb": true,
            "log_wandb": false,
            "wandb_proj_name": "debug"
        },
        "mse": {
            "enabled": false,
            "every_n_epochs": 50,
            "on_save_ckpt": true,
            "num_samples": 20,
            "visualize": true
        },
        "save": {
            "enabled": true,
            "every_n_seconds": null,
            "every_n_epochs": 50,
            "epochs": [],
            "on_best_validation": false,
            "on_best_rollout_return": false,
            "on_best_rollout_success_rate": true
        },
        "epoch_every_n_steps": 100,
        "validation_epoch_every_n_steps": 10,
        "env": null,
        "additional_envs": null,
        "render": false,
        "render_video": true,
        "keep_all_videos": false,
        "video_skip": 5,
  



ObservationKeyToModalityDict: action not found, adding action to mapping with assumed low_dim modality!
BC (
  ModuleDict(
    (policy): ActorNetwork(
        action_dim=39
  
        encoder=ObservationGroupEncoder(
            group=obs
            ObservationEncoder(
                Key(
                    name=fixed_camera
                    shape=[3, 64, 64]
                    modality=rgb
                    randomizer=ModuleList(
                      (0): None
                    )
                    net=VisualCore(
                      input_shape=[3, 64, 64]
                      output_shape=[64]
                      backbone_net=ResNet18Conv(input_channel=3, input_coord_conv=False)
                      pool_net=SpatialSoftmax(num_kp=32, temperature=1.0, noise=0.0)
                    )
                    sharing_from=None
                )
                Key(
                    name=vec_obs
                    shape=[93]
                    modality=low_dim
      

AttributeError: 'EnvMyo' object has no attribute 'obs_dict'