# üß™ Fake Deploy - Test Policy with Dataset Observations

This notebook guides you through testing a trained policy on a physical robot using observations from a recorded dataset instead of live robot observations.

### Process:
1. **Configure**: Set the path to your trained model, dataset, and the robot's server address.
2. **Load**: The `policy_loader` automatically loads the model and its training configuration.
3. **Deploy**: The script replays observations from the dataset through the policy and sends actions to the robot.

This is useful for debugging and understanding how the policy behaves when given the exact observations it was trained on.

## 1. Configuration

First, specify the necessary parameters for deployment. **You must edit these values.**

In [None]:
import pathlib

# TODO: Change to the directory containing your trained policy checkpoint.
# Example: "outputs/2025-09-14/12-00-00"
CHECKPOINT_DIR = pathlib.Path("outputs/<policy_checkpoint_dir>")

# TODO: Change to the dataset directory containing the episodes to replay.
# Example: "data/my_dataset"
DATA_DIR = pathlib.Path("data/<dataset_dir>")

# TODO: Change to the robot's IP address.
SERVER_ENDPOINT = "<robot_ip_address>:50051"

# Episode index to replay from the dataset
EPISODE_INDEX: int = 0

# Inference frequency in Hz. Should match the dataset recording frequency.
INFERENCE_FREQUENCY_HZ: float = 10.0

# Set to True to manually step through each action
STEP_BY_STEP: bool = False

print(f"Attempting to load policy from: {CHECKPOINT_DIR}")
print(f"Dataset directory: {DATA_DIR}")
print(f"Episode index: {EPISODE_INDEX}")
print(f"Robot server endpoint: {SERVER_ENDPOINT}")
print(f"Inference frequency: {INFERENCE_FREQUENCY_HZ} Hz")
print(f"Step-by-step mode: {STEP_BY_STEP}")

## 2. Load the Policy

Now, we load the policy from the specified checkpoint directory. The loader will find the latest checkpoint and its corresponding configuration file.

In [None]:
from example_policies.robot_deploy import policy_loader
import torch

policy, cfg = policy_loader.load_policy(CHECKPOINT_DIR)

# Select device
device = "cpu" if not torch.cuda.is_available() else "cuda"
cfg.device = device
policy.to(device)

print("‚úÖ Policy loaded successfully!")
print(f"Device: {device}")

## 3. (Optional) Modify Policy Attributes

Before deployment, you can override policy attributes for experimentation.

In [None]:
# Uncomment and modify the lines below to change policy attributes.
# For available options, refer to the lerobot policy's config documentation.

# policy.n_action_steps = 15  # Number of actions to predict in each forward pass

# print(f"Action steps set to: {policy.n_action_steps}")

## 4. Fake Deploy to Robot

Finally, execute the cell below to start replaying dataset observations through the policy and sending commands to the robot.

‚ö†Ô∏è **Warning**: This will move the physical robot. Ensure the robot has a clear and safe workspace.

**Note**: The robot will receive actions generated from dataset observations, not live observations. This is useful for debugging whether the issue is with observation processing or policy behavior.

In [None]:
import grpc
from example_policies.robot_deploy.robot_io.robot_interface import RobotClient
from example_policies.robot_deploy.robot_io.robot_service import robot_service_pb2_grpc
from example_policies.robot_deploy.debug_helpers.fake_deploy import inference_loop

# CART_WAYPOINT is most stable and responsive. For legacy behaviour, use CART_QUEUE.
# JOINT_DIRECT and CART_DIRECT are less stable and not recommended at the moment
controller = RobotClient.CART_WAYPOINT

# Connect to robot service
channel = grpc.insecure_channel(SERVER_ENDPOINT)
stub = robot_service_pb2_grpc.RobotServiceStub(channel)

try:
    inference_loop(
        policy=policy,
        cfg=cfg,
        data_dir=DATA_DIR,
        hz=INFERENCE_FREQUENCY_HZ,
        service_stub=stub,
        controller=controller,
        ep_index=EPISODE_INDEX,
        ask_for_input=STEP_BY_STEP,
    )
except Exception as e:
    print(f"Error occurred: {e}")
    raise e
finally:
    channel.close()
    print("Connection closed.")