In [3]:
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output, display
import time

bridge_trajs = np.load("assets/bridge_v2_10_trajs.npy", allow_pickle=True)

In [None]:
from vlm_autoeval_robot_benchmark.utils.ecot_primitives import ecot_primitive_movements, inverse_ecot_primitive_movements

In [20]:
def show_trajectory_video(descriptors, delay=0.2):
    """Show observations as a video with delay between frames."""
    plt.figure(figsize=(10, 10))
    for i, obs in enumerate(descriptors["obs"]):
        clear_output(wait=True)
        plt.imshow(obs)
        title = f"Frame {i} - {descriptors['task_language_instruction']}\n ECOT Move: {descriptors['moves'][i][0]}"
        plt.title(title)
        plt.axis('off')
        display(plt.gcf())
        time.sleep(delay)
    plt.close()


In [6]:
def repackage_to_episode(traj):
    steps = []
    for i in range(len(traj["observations"])):
        step = {}
        step["observation"] = traj["observations"][i]
        step["action"] = traj["actions"][i]
        steps.append(step)
    return dict(steps=steps)

In [22]:
def get_descriptors(traj):
    move_primitives = ecot_primitive_movements.get_move_primitives_episode(repackage_to_episode(traj), threshold=0.00)
    obs_list = [t["images0"] for t in traj["observations"]]  # list of obs
    gt_actions = traj["actions"]  # list of ground truth actions
    task_language_instruction = traj["language"][0] if "language" in traj else None
    return dict(moves=move_primitives, obs=obs_list, gt_actions=gt_actions, task_language_instruction=task_language_instruction)

In [None]:
traj_idx = 5  # Change this to visualize different trajectories
episode_descriptors = get_descriptors(bridge_trajs[traj_idx])
show_trajectory_video(episode_descriptors, delay=0.15)