Notebook for evaluating and RNN. Can also use `script_eval_rnn.sh` instead for a more sophisticated, interactive evaluation.

In [1]:
import numpy as np
import torch
from mocapact import utils
from mocapact.envs import tracking
from dm_control.locomotion.tasks.reference_pose import types
from mocapact.distillation import model


from mocapact.distillation import dataset
from mocapact.sb3.features_extractor import CmuHumanoidFeaturesExtractor
from mocapact import observables
from stable_baselines3.common.running_mean_std import RunningMeanStd

  if not hasattr(tensorboard, "__version__") or LooseVersion(
  ) < LooseVersion("1.15"):
  _PYTHON_LOWER_3_8 = LooseVersion(_PYTHON_VERSION) < LooseVersion("3.8")
  _PYTHON_LOWER_3_8 = LooseVersion(_PYTHON_VERSION) < LooseVersion("3.8")


## Jax predict

In [2]:
import jax
import jax.numpy as np

In [3]:
def predict_esn(params, C_bottleneck, ut, x_init = None, biased = False):
    # u_clock can be used both for clock and for input (for initialization)
    if x_init is None:
        x_init = params["x_ini"]
    
    x = x_init

    if biased == True:
        x = np.dot(C_bottleneck ,(1-params["a_dt"])*x + params["a_dt"]*np.tanh(
            np.dot(params['win'], ut) + np.dot(params['w'], x) + params['bias']))
    else:
        x = (1-params["a_dt"])*x + params["a_dt"]*np.tanh(
            np.dot(params['win'], ut) + np.dot(params['w'], x) + params['bias'])

    y = np.dot(
        params['wout'],x) + params['bias_out']

    return y, x

## Helper functions

In [4]:
dataset_path = './data/dataset/large/'

In [5]:


train_dataset = dataset.ExpertDatasetRNN(
    hdf5_fnames=[
        f'{dataset_path}/CMU_016_15.hdf5', 
        f'{dataset_path}/CMU_016_55.hdf5'
    ],
    observables=observables.MULTI_CLIP_OBSERVABLES_SANS_ID,
    metrics_path=f'{dataset_path}/dataset_metrics.npz',
    clip_ids=None,
    min_seq_steps=1,
    max_seq_steps=1,
    n_start_rollouts=-1,
    n_rsi_rollouts=-1,
    normalize_obs=False,
    clip_len_upsampling=False,
    clip_weighted=False,
    advantage_weights=False,
    temperature=4.0,
    concat_observables=False,
    keep_hdf5s_open=False
)

called _create_offsets() in ExpertDatasetRNN


In [6]:


# create feature extractor
model_observables = observables.HIGH_LEVEL_OBSERVABLES
obs_rms = {}
for obs_key, obs_indices in train_dataset.observable_indices.items():
    rms = RunningMeanStd(shape=obs_indices.shape)
    rms.mean = train_dataset.proprio_mean[obs_indices]
    rms.var = train_dataset.proprio_var[obs_indices]
    rms.count = train_dataset.count
    obs_rms[obs_key] = rms
feature_extractor = CmuHumanoidFeaturesExtractor(
    observation_space=train_dataset.full_observation_space,
    observable_keys=model_observables,
    obs_rms=obs_rms
)

In [7]:
from IPython.display import HTML
from IPython.display import display
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation


def visualize_images(images, show=True, to_file = None):
    fig, ax = plt.subplots()
    x = ax.imshow(images[0])

    def update(frame):
        x.set_data(images[frame])
        return x

    ani = FuncAnimation(fig, update, frames=range(len(images)), interval=50)
    if to_file is not None:
        ani.save(to_file, writer='imagemagick', fps=30)
    if show:
        display(HTML(ani.to_html5_video()))
    plt.close()

In [20]:
snippets = [
    'CMU_016_15-0-127',  # walking
    'CMU_016_55-0-47'    # running
]

env = tracking.MocapTrackingGymEnv(
    utils.make_clip_collection(snippets),
    ref_steps=list(range(1, 6)),
    act_noise=0.0,
    task_kwargs=dict(
        reward_type='comic',
        min_steps=9,
        ghost_offset=np.array([1., 0., 0.]),
        always_init_at_clip_start=False,
        termination_error_threshold=0.3
    )
)

In [21]:
# loading parameters and conceptor
params = np.load('params_esn_comp.npz', allow_pickle=True)
C_line = np.load('C_line.npz', allow_pickle=True)

params = params['arr_0'].item()
C_line = C_line['arr_0']

In [22]:
ims = []
state = None
C = np.eye(500)
with torch.no_grad():
    time_step = env._env.reset()

    while not time_step.last():
        if time_step.step_type == 0:
            state = None
        obs = env.get_observation(time_step)
        
        # translate obs from dict of array to dict of tensors
        obs_ = {k : torch.as_tensor(o.reshape(-1, *train_dataset.full_observation_space[k].shape)) for k, o in obs.items()}
        observation = feature_extractor(obs_)
        inp = np.array(observation)[0]

        action, state = predict_esn(params,C, inp, x_init = state, biased = False)
        time_step = env._env.step(action)
        img = env.render()
        ims.append(img)

In [23]:
visualize_images(ims, to_file="walkrun_example.gif")

