In [1]:
from pathlib import Path

import gymnasium as gym
import imageio
import numpy
import torch
from huggingface_hub import snapshot_download

from lerobot.common.policies.act.modeling_act import ACTPolicy

# Create a directory to store the video of the evaluation
output_directory = Path("outputs/eval/example_act")
output_directory.mkdir(parents=True, exist_ok=True)

# Download the act policy for aloha environment
pretrained_policy_path = Path(snapshot_download("lerobot/act_aloha_sim_transfer_cube_human"))

policy = ACTPolicy.from_pretrained(pretrained_policy_path)

# Quantization
policy_int8 = torch.ao.quantization.quantize_dynamic(
    policy,  # the original model
    {torch.nn.Linear, torch.nn.Conv2d},  # a set of layers to dynamically quantize
    dtype=torch.qint8)

policy_int8.eval()

# Only cpu for experiments
device = torch.device("cpu")
policy_int8.to(device);

import os
os.environ["MUJOCO_GL"]="egl"
import gym_aloha

env = gym.make(
    "gym_aloha/AlohaTransferCube-v0",
    obs_type="pixels_agent_pos"
)

# Reset the policy and environmens to prepare for rollout
policy_int8.reset()
numpy_observation, info = env.reset(seed=43)

# Prepare to collect every rewards and all the frames of the episode,
# from initial state to final state.
rewards = []
frames = []

# Render frame of the initial state
frames.append(env.render())

step = 0
done = False
while not done:
    # Prepare observation for the policy running in Pytorch
    state = torch.from_numpy(numpy_observation['agent_pos'])
    state = state.to(torch.float32)
    image = torch.from_numpy(numpy_observation['pixels']['top'])
    image = image.to(torch.float32) / 255
    image = image.permute(2, 0, 1)

    # Send data tensors from CPU to GPU
    state = state.to(device, non_blocking=True)
    image = image.to(device, non_blocking=True)

    # Add extra (empty) batch dimension, required to forward the policy
    state = state.unsqueeze(0)
    image = image.unsqueeze(0)

    # Create the policy input dictionary
    observation = {
        "observation.images.top": image,
        "observation.state": state,
    }

    # Predict the next action with respect to the current observation
    with torch.inference_mode():
        action = policy_int8.select_action(observation)

    # Prepare the action for the environment
    numpy_action = action.squeeze(0).to("cpu").numpy()

    # Step through the environment and receive a new observation
    numpy_observation, reward, terminated, truncated, info = env.step(numpy_action)
    # print(f"{step=} {reward=} {terminated=}")

    # Keep track of all the rewards and frames
    rewards.append(reward)
    frames.append(env.render())

    # The rollout is considered done when the success state is reach (i.e. terminated is True),
    # or the maximum number of iterations is reached (i.e. truncated is True)
    done = terminated | truncated | done
    step += 1

if terminated:
    print("Success!")
else:
    print("Failure!")

# Get the speed of environment (i.e. its number of frames per second).
fps = env.metadata["render_fps"]

# Encode all frames into a mp4 video.
video_path = output_directory / "rollout_int8.mp4"
imageio.mimsave(str(video_path), numpy.stack(frames), fps=fps)

print(f"Video of the evaluation is available in '{video_path}'.")

  from .autonotebook import tqdm as notebook_tqdm
Fetching 10 files: 100%|██████████████████████████████████████████████████████████████████| 10/10 [00:00<00:00, 127100.12it/s]


Loading weights from local directory
Success!
Video of the evaluation is available in 'outputs/eval/example_act/rollout_int8.mp4'.
