In [1]:
import time
import torch
import numpy as np
from pathlib import Path

# LeRobot imports
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy
from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig
from lerobot.robots.so100_follower.so100_follower import SO100Follower
from lerobot.robots.bi_so100_follower.config_bi_so100_follower import BiSO100FollowerConfig
from lerobot.robots.bi_so100_follower.bi_so100_follower import BiSO100Follower
from lerobot.utils.control_utils import init_keyboard_listener, predict_action
from lerobot.utils.utils import (log_say, get_safe_torch_device)


In [None]:
def write_calibration(robot):
    # Write calibration to motors BEFORE connecting
    if robot.left_arm.calibration:
        robot.left_arm.bus.connect()
        robot.left_arm.bus.write_calibration(robot.left_arm.calibration)
        # Verify it worked
        motor_cal = robot.left_arm.bus.read_calibration()
        if motor_cal == robot.left_arm.calibration:
            print("✓ Left arm calibration verified")
        else:
            print("⚠️ Left arm calibration mismatch!")
        robot.left_arm.bus.disconnect()

    if robot.right_arm.calibration:
        robot.right_arm.bus.connect()  
        robot.right_arm.bus.write_calibration(robot.right_arm.calibration)
        # Verify it worked
        motor_cal = robot.right_arm.bus.read_calibration()
        if motor_cal == robot.right_arm.calibration:
            print("✓ Right arm calibration verified")
        else:
            print("⚠️ Right arm calibration mismatch!")
        robot.right_arm.bus.disconnect()

    print("✓ Calibration written to motors")

In [2]:
INFERENCE_TIME_SEC = 30  # How long to run inference
FPS = 30                 # Control frequency
DEVICE = "mps"          # "cuda", "mps", or "cpu"
TASK_DESCRIPTION = "Put the red block in the blue bin"

LEFT_PORT = "/dev/tty.usbmodem58760435551"  # Update for your robot
RIGHT_PORT = "/dev/tty.usbmodem58FA0922121"

CAMERA_CONFIG = {
    "left": OpenCVCameraConfig(index_or_path=1, width=640, height=480, fps=FPS),
    "right": OpenCVCameraConfig(index_or_path=0, width=640, height=480, fps=FPS),
    "top": OpenCVCameraConfig(index_or_path=2, width=640, height=480, fps=FPS)
}

POLICY_TYPE = "smolvla"  # "act" or "smolvla"
#POLICY_PATH = "/Users/shreyas/Downloads/robots/policies/smolvla/020000/pretrained_model"
POLICY_PATH = "/Users/shreyas/Downloads/robots/policies/tedrake/020000/pretrained_model"

In [None]:
# Create robot configuration
robot_config = BiSO100FollowerConfig(
    left_arm_port=LEFT_PORT,
    right_arm_port=RIGHT_PORT,
    id="bimanual_follower", 
    cameras=CAMERA_CONFIG
)

# Initialize robot
robot = BiSO100Follower(robot_config)
print("✓ Robot configured")
#print(f"Left arm calibration path: {robot.left_arm.calibration_fpath}")
#print(f"Right arm calibration path: {robot.right_arm.calibration_fpath}")

✓ Robot configured
Left arm calibration path: /Users/shreyas/.cache/huggingface/lerobot/calibration/robots/so100_follower/bimanual_follower_left.json
Right arm calibration path: /Users/shreyas/.cache/huggingface/lerobot/calibration/robots/so100_follower/bimanual_follower_right.json
✓ Calibration written to motors




In [None]:
# Load policy (automatically handles Tedrake normalization if trained with it)
if POLICY_TYPE == "act":
    policy = ACTPolicy.from_pretrained(POLICY_PATH)
elif POLICY_TYPE == "smolvla":
    policy = SmolVLAPolicy.from_pretrained(POLICY_PATH)
else:
    raise ValueError(f"Unsupported policy type: {POLICY_TYPE}")

policy.to(DEVICE)
print(f"✓ Loaded {POLICY_TYPE} policy from: {POLICY_PATH}")
print(f"✓ Policy moved to device: {DEVICE}")


In [None]:
# Setup keyboard listener for real-time control
#_, events = init_keyboard_listener()
#print("✓ Keyboard listener initialized")
#print("   ESC: Stop | SPACE: Pause/Resume | R: Reset")


In [None]:
from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, "action")
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
dataset_features = {**action_features, **obs_features}

In [None]:
# Connect to robot
robot.connect()
print("✓ Robot connected successfully")

# Utility function for precise timing
def busy_wait(duration):
    """Precise timing for control loop"""
    if duration > 0:
        end_time = time.perf_counter() + duration
        while time.perf_counter() < end_time:
            pass

print(f"Starting inference for {INFERENCE_TIME_SEC} seconds...")
print(f"Task: {TASK_DESCRIPTION}")

In [None]:
start_time = time.perf_counter()
num_steps = int(INFERENCE_TIME_SEC * FPS)
paused = False

try:
    for step in range(num_steps):
        loop_start = time.perf_counter()
        
        # Capture observation from robot
        observation = robot.get_observation()
        
        observation_frame = build_dataset_frame(dataset_features, observation, prefix="observation")
        action_values = predict_action(
                observation_frame,
                policy,
                get_safe_torch_device(policy.config.device),
                policy.config.use_amp,
                task=TASK_DESCRIPTION,
                robot_type=robot.robot_type,
            )
        action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
        
        # Send action to robot
        robot.send_action(action)
        
        # Progress logging
        if step % (FPS * 5) == 0:  # Every 5 seconds
            elapsed = time.perf_counter() - start_time
            remaining = INFERENCE_TIME_SEC - elapsed
            print(f"Step {step}/{num_steps} | {elapsed:.1f}s elapsed | {remaining:.1f}s remaining")
        
        # Maintain precise timing
        dt = time.perf_counter() - loop_start
        busy_wait(1/FPS - dt)
        
except KeyboardInterrupt:
    print("\n Inference interrupted by user")
    
except Exception as e:
    print(f"\n Error during inference: {e}")
    
finally:
    # Always disconnect robot
    robot.disconnect()
    print("\n✓ Robot disconnected safely")
    
total_time = time.perf_counter() - start_time
print(f" Inference completed in {total_time:.1f} seconds")
print(f" Average FPS: {step / total_time:.1f}")


In [None]:
# Example: Add custom state information (like in robotbuilder inference)
def add_custom_state(observation, pick_target=None, place_target=None):
    """Add custom state information to observation"""
    if pick_target is not None and place_target is not None:
        # Normalize targets to [0,1] if needed
        norm_pick = torch.tensor(pick_target, dtype=torch.float32)
        norm_place = torch.tensor(place_target, dtype=torch.float32)
        
        # Append to state
        observation["observation.state"] = torch.cat([
            observation["observation.state"], 
            norm_pick, 
            norm_place
        ])
    
    return observation

# Usage in inference loop:
# observation = add_custom_state(observation, pick_target=[0.5, 0.3], place_target=[0.8, 0.7])
