# LeRobot Policy Integration with PhysicalAI

This notebook demonstrates how PhysicalAI integrates with [LeRobot](https://github.com/huggingface/lerobot) policies for end-to-end robot learning workflows. You'll learn:

1. **LeRobotPolicy Wrapper** - Universal wrapper that works with any LeRobot policy
2. **Explicit Policy Wrappers** - Type-safe wrappers for specific policies (ACT, Diffusion, Groot etc.)
3. **Training Workflow** - End-to-end training with the `Trainer` class
4. **Inference** - Running trained policies for robot control

## Prerequisites

- PhysicalAI library installed (`pip install -e library/`)
- LeRobot installed (`pip install lerobot`)
- A dataset (we'll use HuggingFace datasets)

## 1. Import Required Libraries

In [1]:
# Core imports
import torch

# PhysicalAI policy wrappers
from physicalai.policies.lerobot import (
    LeRobotPolicy,  # Universal wrapper for any LeRobot policy
    ACT,  # Explicit ACT policy wrapper
    Diffusion,  # Explicit Diffusion policy wrapper
    Groot,  # Explicit Groot (GR00T-N1) policy wrapper
)

# Data and training utilities
from physicalai.data.lerobot import LeRobotDataModule, get_delta_timestamps_from_policy
from physicalai.train import Trainer

print("✅ All imports successful!")

  from .autonotebook import tqdm as notebook_tqdm


✅ All imports successful!


## 2. Understanding LeRobotPolicy Wrapper

The `LeRobotPolicy` is a universal wrapper that can load **any** LeRobot policy by name. It provides:

- **Automatic configuration loading** from LeRobot's policy registry
- **PyTorch Lightning integration** for seamless training
- **Unified interface** across all policy types

### Available Policies

LeRobot supports several policy architectures. PhysicalAI provides **explicit wrappers** for:
- **ACT (Action Chunking Transformer)** - Predicts action sequences using transformers with a VAE
- **Diffusion Policy** - Uses denoising diffusion for action generation
- **Groot (GR00T-N1)** - NVIDIA's vision-language-action foundation model for humanoid robots

For other policies (VQ-BeT, TDMPC, Pi0, etc.), use the universal `LeRobotPolicy` wrapper.

In [35]:
# Create a LeRobotPolicy using the universal wrapper
# Just specify the policy name - configuration is loaded automatically

act_policy = LeRobotPolicy(policy_name="act")
print(f"Policy type: {type(act_policy)}")
print(f"Policy name: {act_policy.policy_name}")

# Note: The underlying LeRobot policy is created during setup() when features
# are available from the DataModule. For immediate initialization, use from_dataset():
act_policy_eager = LeRobotPolicy.from_dataset("act", "lerobot/pusht")
print(f"\nEager initialized policy config (first 10 keys):")
for i, (k, v) in enumerate(vars(act_policy_eager._config).items()):
    if i >= 10:
        print("  ...")
        break
    print(f"  {k}: {v}")



Policy type: <class 'physicalai.policies.lerobot.universal.LeRobotPolicy'>
Policy name: act

Eager initialized policy config (first 10 keys):
  n_obs_steps: 1
  input_features: {'observation.image': PolicyFeature(type=<FeatureType.VISUAL: 'VISUAL'>, shape=(3, 96, 96)), 'observation.state': PolicyFeature(type=<FeatureType.STATE: 'STATE'>, shape=(2,))}
  output_features: {'action': PolicyFeature(type=<FeatureType.ACTION: 'ACTION'>, shape=(2,))}
  device: cuda
  use_amp: False
  push_to_hub: True
  repo_id: None
  private: None
  tags: None
  license: None
  ...


## 3. Explicit Policy Wrappers

For type-safety and IDE autocompletion, PhysicalAI provides **explicit wrappers** for specific policy types:
- `ACT` - Action Chunking Transformer
- `Diffusion` - Diffusion Policy  
- `Groot` - NVIDIA's GR00T-N1 Foundation Model

These wrappers expose policy-specific parameters directly as constructor arguments.

In [None]:
# ACT Policy - Action Chunking Transformer
# Predicts sequences of actions using a transformer with VAE latent space

act = ACT(
    chunk_size=100,  # Number of future actions to predict (action chunking)
    n_obs_steps=1,  # Number of observation history steps (1 = current only)
    dim_model=256,  # Transformer hidden dimension
    n_heads=8,  # Number of attention heads
    n_encoder_layers=4,
    n_decoder_layers=1,
    optimizer_lr=1e-5,
)
print("ACT Policy created with 100-step action chunks")
print(f"  - chunk_size: {act._policy_config['chunk_size']}")
print(f"  - n_obs_steps: {act._policy_config['n_obs_steps']}")

ACT Policy created with 100-step action chunks
  - chunk_size: 100
  - n_obs_steps: 1


In [None]:
# Diffusion Policy - Denoising Diffusion for Action Generation
# Uses iterative denoising to generate smooth action trajectories

diffusion = Diffusion(
    n_action_steps=8,  # Actions to execute per inference
    horizon=16,  # Total planning horizon
    n_obs_steps=2,  # Observation history length
    num_inference_steps=10,  # Denoising steps during inference
    optimizer_lr=1e-4,
)
print("Diffusion Policy created")
print(f"  - horizon: {diffusion._policy_config['horizon']}")
print(f"  - n_obs_steps: {diffusion._policy_config['n_obs_steps']}")

Diffusion Policy created
  - horizon: 16
  - n_obs_steps: 2


In [None]:
# Groot Policy - NVIDIA's GR00T-N1.5 Foundation Model
# Vision-language-action model for generalist humanoid robots
# Note: Requires 24GB+ VRAM and `pip install physicalai[groot]`

groot = Groot(
    chunk_size=50,  # Action sequence length
    n_action_steps=50,  # Actions to execute per inference
    tune_projector=True,  # Fine-tune vision-to-LLM projector
    tune_diffusion_model=True,  # Fine-tune diffusion action head
    tune_llm=False,  # Keep LLM frozen (saves memory)
    optimizer_lr=1e-4,
)
print("Groot Policy created")
print(f"  - chunk_size: {groot._policy_config['chunk_size']}")
print(f"  - tune_projector: {groot._policy_config['tune_projector']}")

Groot Policy created
  - chunk_size: 50
  - tune_projector: True


## 4. Delta Timestamps: Handling Action Chunking

Policies like ACT predict multiple future actions (action chunking). The `delta_timestamps` parameter tells the dataset how to align actions across time steps.

**Key concept**: If ACT predicts 100 future actions at 10 FPS, it needs actions spanning 10 seconds (100 * 0.1s).

In [41]:
# Generate delta timestamps automatically from policy configuration
fps = 10  # Dataset recording frequency

# For ACT with chunk_size=100
act_delta_timestamps = get_delta_timestamps_from_policy("act", fps=fps)
print("ACT delta_timestamps:")
print(f"  action: {len(act_delta_timestamps['action'])} values")
print(f"  First 5: {act_delta_timestamps['action'][:5]}")
print(f"  Last 5: {act_delta_timestamps['action'][-5:]}")

# For Diffusion with n_obs_steps=2 and horizon=16
diffusion_delta_timestamps = get_delta_timestamps_from_policy("diffusion", fps=fps)
print("\nDiffusion delta_timestamps:")
for key, values in diffusion_delta_timestamps.items():
    print(f"  {key}: {values}")



ACT delta_timestamps:
  action: 100 values
  First 5: [0.0, 0.1, 0.2, 0.3, 0.4]
  Last 5: [9.5, 9.6, 9.7, 9.8, 9.9]

Diffusion delta_timestamps:
  observation.images.top: [-0.1, 0.0]
  observation.state: [-0.1, 0.0]
  action: [-0.1, 0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4]


## 5. Setting Up the DataModule

The `LeRobotDataModule` handles data loading and preprocessing. It supports two data formats:

- **`"physicalai"`** - Uses the `Observation` dataclass (recommended for PhysicalAI workflows)
- **`"lerobot"`** - Native LeRobot dictionary format (for compatibility)

In [2]:
# Create the DataModule with a HuggingFace dataset
# Using lerobot/pusht as an example dataset

fps = 10  # Recording frequency of the dataset

# Note: video_backend="pyav" for better compatibility with various video codecs
datamodule = LeRobotDataModule(
    repo_id="lerobot/pusht",  # HuggingFace dataset
    train_batch_size=8,
    data_format="physicalai",  # Use Observation dataclass
    delta_timestamps=get_delta_timestamps_from_policy("act", fps=fps),
    video_backend="pyav",  # Use pyav for video decoding
)

print(f"DataModule created:")
print(f"  - train_batch_size: {datamodule.train_batch_size}")
print(f"  - data_format: {datamodule.data_format}")



DataModule created:
  - train_batch_size: 8
  - data_format: physicalai


In [3]:
# Inspect the dataset structure
datamodule.setup("fit")
dataset = datamodule.train_dataset

print(f"Dataset size: {len(dataset)} samples")

# Get a sample - the physicalai format returns Observation dataclass
sample = dataset[0]
print(f"\nSample type: {type(sample).__name__}")
print(f"State shape: {sample.state.shape}")
print(f"Action shape: {sample.action.shape}")
if isinstance(sample.images, dict):
    for name, img in sample.images.items():
        print(f"Image '{name}' shape: {img.shape}")
else:
    print(f"Images shape: {sample.images.shape}")

Dataset size: 25650 samples

Sample type: Observation
State shape: torch.Size([2])
Action shape: torch.Size([100, 2])
Images shape: torch.Size([3, 96, 96])




## 6. End-to-End Training with Trainer

The `Trainer` class is a PyTorch Lightning-based trainer that handles:

- Policy initialization based on dataset features
- Automatic callback injection (e.g., `PolicyDatasetInteraction`)
- Logging, checkpointing, and distributed training

In [4]:
# Create the policy for training
policy = ACT(
    chunk_size=100,
    n_obs_steps=1,
    optimizer_lr=1e-5,
)

# Create the trainer
trainer = Trainer(
    max_steps=5,  # Just 5 steps for demo
    accelerator="auto",  # Automatically select GPU/CPU
    enable_checkpointing=False,  # Disable for demo
    logger=False,  # Disable logging for demo
    enable_progress_bar=True,
)

print("Trainer and Policy ready for training!")
print(f"  - max_steps: {trainer.max_steps}")

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
TPU available: False, using: 0 TPU cores


Trainer and Policy ready for training!
  - max_steps: 5


In [5]:
# Start training!
# The Trainer automatically:
# 1. Calls policy.setup() with dataset features
# 2. Injects PolicyDatasetInteraction callback
# 3. Handles training loop

trainer.fit(model=policy, datamodule=datamodule)
print("\n✅ Training complete!")

You are using a CUDA device ('NVIDIA RTX A6000') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name            | Type      | Params | Mode  | FLOPs
--------------------------------------------------------------
0 | val_rollout     | Rollout   | 0      | train | 0    
1 | test_rollout    | Rollout   | 0      | train | 0    
2 | _lerobot_policy | ACTPolicy | 51.6 M | train | 0    
--------------------------------------------------------------
51.6 M    Trainable params
0         Non-trainable params
51.6 M    Total params
206.356   Total estimated model params size (MB)
184       Modules in train mode
0         Modules in eval mode
0         T

Epoch 0:   0%|          | 0/5 [00:00<?, ?it/s]

/home/sakcay/projects/geti/geti-action/library/.venv/lib/python3.12/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:378: You have overridden `transfer_batch_to_device` in `LightningModule` but have passed in a `LightningDataModule`. It will use the implementation from `LightningModule` instance.


Epoch 0: 100%|██████████| 5/5 [00:00<00:00,  6.37it/s, train/loss=25.10]

`Trainer.fit` stopped: `max_steps=5` reached.


Epoch 0: 100%|██████████| 5/5 [00:00<00:00,  6.36it/s, train/loss=25.10]


✅ Training complete!

✅ Training complete!


## 7. Running Inference

After training, use the policy for inference to predict actions from observations.

In [None]:
# Set policy to evaluation mode and move to device
def get_device() -> torch.device:
    """Get the best available device (XPU > CUDA > CPU)."""
    if hasattr(torch, "xpu") and torch.xpu.is_available():
        return torch.device("xpu")
    if torch.cuda.is_available():
        return torch.device("cuda")
    return torch.device("cpu")


device = get_device()
policy = policy.to(device)
policy.eval()

print(f"Policy device: {device}")

# Get a sample observation from the dataset (unbatched, single sample)
sample = dataset[0]

# Create observation for inference
# NOTE: The LeRobot preprocessor pipeline automatically handles:
#   1. AddBatchDimensionProcessorStep - adds batch dim to unbatched inputs
#   2. DeviceProcessorStep - moves tensors to the policy's device
#   3. NormalizerProcessorStep - normalizes inputs using dataset statistics
# So we pass raw unbatched data - no need for manual .unsqueeze(0) or .to(device)
from physicalai.data.observation import Observation

observation = Observation(
    state=sample.state,  # Shape: [2] -> preprocessor adds batch -> [1, 2]
    images=sample.images,  # Shape: [C, H, W] -> preprocessor adds batch -> [1, C, H, W]
    action=None,
)

# Run inference using select_action (which applies the preprocessor pipeline)
with torch.no_grad():
    action = policy.select_action(observation)

print(f"\nInput state shape (before preprocessing): {sample.state.shape}")
print(f"Input images shape (before preprocessing): {sample.images.shape}")
print(f"Predicted action shape: {action.shape}")
print(f"\nPredicted action:\n{action}")

Policy device: cuda

Input state shape (before preprocessing): torch.Size([2])
Input images shape (before preprocessing): torch.Size([3, 96, 96])
Predicted action shape: torch.Size([1, 2])

Predicted action:
tensor([[197.6160, 250.0277]])


## 8. Comparing Wrapper Approaches

| Approach | Pros | Cons |
|----------|------|------|
| **Universal `LeRobotPolicy`** | Works with any LeRobot policy by name | No IDE autocompletion for policy-specific params |
| | Good for dynamic policy selection | |
| **Explicit Wrappers (`ACT`, `Diffusion`, `Groot`)** | Type-safe with IDE autocompletion | Need to import specific class |
| | Policy-specific parameters as constructor args | |
| | Better for production code | |

In [48]:
# Universal wrapper - flexible, works with any policy
universal_act = LeRobotPolicy(policy_name="act")
universal_diffusion = LeRobotPolicy(policy_name="diffusion")

# Explicit wrappers - type-safe, IDE autocompletion
explicit_act = ACT(chunk_size=100)
explicit_diffusion = Diffusion(horizon=16)
# explicit_groot = Groot(chunk_size=50)  # Requires GPU with 24GB+ VRAM

print("Both approaches create equivalent policies:")
print(f"  universal_act.policy_name = '{universal_act.policy_name}'")
print(f"  explicit_act.policy_name = '{explicit_act.policy_name}'")

Both approaches create equivalent policies:
  universal_act.policy_name = 'act'
  explicit_act.policy_name = 'act'


## 9. Summary

This notebook demonstrated:

1. **LeRobotPolicy** - Universal wrapper for any LeRobot policy
2. **Explicit Wrappers** - Type-safe `ACT`, `Diffusion`, `Groot`
3. **Delta Timestamps** - Automatic generation via `get_delta_timestamps_from_policy()`
4. **LeRobotDataModule** - Data loading with "physicalai" or "lerobot" format
5. **Trainer** - PyTorch Lightning-based training with automatic policy setup

### Next Steps

- Explore different policies for your task
- Use image observations with the `images` field in `Observation`
- Try distributed training with `Trainer(devices=2, strategy="ddp")`
- Export trained policies for deployment

### Resources

- [LeRobot Documentation](https://github.com/huggingface/lerobot)
- [PhysicalAI Library](../../README.md)
- [Policy Design Document](../../docs/design/policies.md)