# Expert Notebook: Low-Level Embodied Intelligence with Foundation Models

### Inspired by Fei Xia's CS25 Lecture & "A Survey on Robotics with Foundation Models"

**Objective:** This notebook is designed to build a PhD-level intuition for how foundation models are revolutionizing low-level robotic control. We will go far beyond the lecture's surface, diving into the core mechanisms with from-scratch code implementations, interactive visualizations, and mathematical deep dives. Our goal is not just to understand *that* these methods work, but *why* and *how* they work at a fundamental level, including their limitations and future potential.

## Section 1: The Grand Challenge of Embodied AI

Before we build, we must understand the problem. Embodied AI is fundamentally different from the "Internet AI" that foundation models like GPT were originally built for. The physical world is messy, unforgiving, and governed by the laws of physics, not just the statistics of language.

### 1.1. Moravec's Paradox: Why Robotics is Hard

**Formal Definition:** Moravec's paradox is the observation that, contrary to traditional assumptions, high-level reasoning (like playing chess or proving theorems) requires relatively little computation, while low-level sensorimotor skills (like perception, object manipulation, and mobility) require enormous computational resources.

**Intuitive Explanation:** For humans, walking across a room is effortless, but multiplying large numbers is hard. For a computer, the opposite is true. This is because evolution has spent billions of years optimizing our sensorimotor cortex, making physical interaction feel easy. For AI, abstract reasoning is a more natural fit for its computational structure, while interacting with the unstructured physical world is a monumental challenge.

*   **Internet AI (e.g., ChatGPT):** Operates on a world of discrete, symbolic tokens (text). The 'physics' of this world are grammatical rules and semantic relationships.
*   **Embodied AI (e.g., a robot arm):** Operates on a world of continuous states, partial observability (occlusions), sensor noise, and complex physical dynamics (friction, inertia, collisions). An action isn't just outputting a token; it's a stream of motor torques that must be perfectly timed and calibrated.

In [1]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import time

# Visualization of Moravec's Paradox
fig = make_subplots(rows=1, cols=2, subplot_titles=("Human Difficulty", "Traditional AI Difficulty"))

tasks = ['Picking up a cup', 'Walking on uneven ground', 'Recognizing a face', 'Playing Chess', 'Calculus Integration']
human_difficulty = [1, 2, 0.5, 8, 9]  # Arbitrary low numbers for sensorimotor, high for reasoning
ai_difficulty = [9, 10, 7, 2, 1]      # Arbitrary high numbers for sensorimotor, low for reasoning

fig.add_trace(go.Bar(x=tasks, y=human_difficulty, name='Human', marker_color='blue'), row=1, col=1)
fig.add_trace(go.Bar(x=tasks, y=ai_difficulty, name='AI', marker_color='red'), row=1, col=2)

fig.update_layout(title_text="Visualizing Moravec's Paradox", height=500, showlegend=False)
fig.update_yaxes(title_text="Perceived Difficulty", range=[0, 10])
fig.show()

### 1.2. The Foundation Model "Recipe" for Robotics

Fei Xia and the research paper lay out a prevailing recipe for tackling this challenge:

$$ \text{Embodied Intelligence} \approx \underbrace{\text{Large-Scale Data}}_{\text{Web-scale + Robotics Data}} + \underbrace{\text{High-Capacity Model}}_{\text{e.g., Transformer}} + \underbrace{\text{Language as Glue}}_{\text{Universal Task Interface}} $$

We will spend this notebook deconstructing each part of this recipe, focusing on the low-level control aspect.

## Section 2: The Action Space as a Language

The first major breakthrough discussed is the idea behind RT-1 and RT-2: if a Transformer is a universal sequence processor, can we treat a robot's actions as just another sequence—a new language? This section provides a deep, hands-on dive into this concept.

### 2.1. Theory: From Continuous Control to Discrete Tokens

A robot's action is typically a continuous vector in a multi-dimensional space. For a 6-DoF (Degrees of Freedom) arm, this could be:
$$ a = [\Delta x, \Delta y, \Delta z, \Delta \text{roll}, \Delta \text{pitch}, \Delta \text{yaw}, \text{gripper_state}] $$
where each component is a floating-point number.

Large Language Models, however, operate on a discrete vocabulary of integer tokens. The core idea of RT-1/RT-2 is to **discretize** the continuous action space.

**Mathematical Derivation (Discretization):**
For each dimension `d` of the action vector `a`, we define a range `[min_d, max_d]` and a number of bins `B` (e.g., 256).

1.  **Normalization:** First, we normalize the continuous value `a_d` to a range of `[0, 1]`.
    $$ a_{d, \text{norm}} = \frac{a_d - \text{min}_d}{\text{max}_d - \text{min}_d} $$

2.  **Binning:** We then scale this normalized value by the number of bins and round to the nearest integer to get the token `t_d`.
    $$ t_d = \text{round}(a_{d, \text{norm}} \times (B - 1)) $$
    This results in an integer token `t_d \in [0, 1, ..., B-1]`.

**Decoding (De-tokenization):** To convert a token back to a continuous action:
1.  **De-normalize the token:**
    $$ a_{d, \text{norm}} = \frac{t_d}{B-1} $$
2.  **Scale back to the original action range:**
    $$ a_d = a_{d, \text{norm}} \times (\text{max}_d - \text{min}_d) + \text{min}_d $$

By doing this for every dimension, the continuous action vector `a` becomes a sequence of integer tokens `[t_1, t_2, ..., t_D]`, which looks just like a sentence to a Transformer.

In [2]:
# Code Implementation: Action Tokenizer (From Scratch)
import numpy as np
import ipywidgets as widgets
from IPython.display import display

class ActionTokenizer:
    """Implements the discretization and de-tokenization of a continuous action space."""
    def __init__(self, action_dims_config, num_bins=256):
        """
        action_dims_config: A dictionary mapping dimension name to (min, max) range.
        e.g., {'delta_x': (-0.05, 0.05), 'gripper': (0, 1)}
        """
        self.dims_config = action_dims_config
        self.dim_names = list(action_dims_config.keys())
        self.num_bins = num_bins
        self.num_dims = len(self.dim_names)

    def encode(self, continuous_actions):
        """Converts a dictionary of continuous actions to a list of integer tokens."""
        tokens = []
        for dim_name in self.dim_names:
            val = continuous_actions.get(dim_name, 0.0)
            min_val, max_val = self.dims_config[dim_name]

            # 1. Clamp the value to be within the defined range
            val_clamped = np.clip(val, min_val, max_val)

            # 2. Normalize to [0, 1]
            val_norm = (val_clamped - min_val) / (max_val - min_val)

            # 3. Scale to bin range and convert to integer token
            token = int(round(val_norm * (self.num_bins - 1)))
            tokens.append(token)
        return tokens

    def decode(self, tokens):
        """Converts a list of integer tokens back to a dictionary of continuous actions."""
        if len(tokens) != self.num_dims:
            raise ValueError(f"Expected {self.num_dims} tokens, but got {len(tokens)}")

        continuous_actions = {}
        for i, token in enumerate(tokens):
            dim_name = self.dim_names[i]
            min_val, max_val = self.dims_config[dim_name]

            # 1. De-normalize token to [0, 1]
            val_norm = token / (self.num_bins - 1)

            # 2. Scale back to original action range
            val = val_norm * (max_val - min_val) + min_val
            continuous_actions[dim_name] = val
        return continuous_actions

# --- Interactive Exploration ---
rt1_action_config = {
    'world_vector_x': (-0.05, 0.05), # Delta X in meters
    'world_vector_y': (-0.05, 0.05), # Delta Y in meters
    'world_vector_z': (-0.05, 0.05), # Delta Z in meters
    'rot_axis_angle_x': (-np.pi / 4, np.pi / 4), # Delta Roll
    'rot_axis_angle_y': (-np.pi / 4, np.pi / 4), # Delta Pitch
    'rot_axis_angle_z': (-np.pi / 4, np.pi / 4), # Delta Yaw
    'gripper': (0.0, 1.0), # 0=closed, 1=open
    'terminate': (0.0, 1.0) # Not really continuous, but can be binned
}

tokenizer = ActionTokenizer(rt1_action_config)

print("### Interactive Action Tokenizer ###")
print("Move the sliders to see how continuous robot actions become discrete tokens.")

sliders = {name: widgets.FloatSlider(min=min_v, max=max_v, step=(max_v-min_v)/100, description=name) for name, (min_v, max_v) in rt1_action_config.items()}
output_tokens_label = widgets.Label(value="Tokens: ")
decoded_actions_label = widgets.Label(value="Decoded: ")

def update_tokenizer_visualization(*args):
    continuous_vals = {name: slider.value for name, slider in sliders.items()}
    tokens = tokenizer.encode(continuous_vals)
    decoded_vals = tokenizer.decode(tokens)

    output_tokens_label.value = f"Action Tokens: {tokens}"
    decoded_str = ", ".join([f"{k}: {v:.3f}" for k, v in decoded_vals.items()])
    decoded_actions_label.value = f"Decoded (note quantization error):\n{decoded_str}"

for slider in sliders.values():
    slider.observe(update_tokenizer_visualization, names='value')

update_tokenizer_visualization()
display(widgets.VBox(list(sliders.values()) + [output_tokens_label, decoded_actions_label]))

### Interactive Action Tokenizer ###
Move the sliders to see how continuous robot actions become discrete tokens.


VBox(children=(FloatSlider(value=0.0, description='world_vector_x', max=0.05, min=-0.05, step=0.001), FloatSli…

**Expert Insight:** The interactive tokenizer above reveals a critical concept: **quantization error**. When we decode the tokens back into continuous actions, the result is not identical to the original input. This is the trade-off for fitting the problem into a language model's framework. The model's predictions will be inherently coarse, limited by the `num_bins`. Increasing `num_bins` allows for finer control but creates a much larger vocabulary for the model to learn, which can make training harder.

### 2.2. The Vision-Language-Action (VLA) Model Architecture

With actions represented as a language, we can now use a Vision-Language Model (VLM) and fine-tune it to become a Vision-Language-Action (VLA) model like RT-2.

**Conceptual Data Flow:**

1.  **Inputs:**
    *   **Image Observation:** A camera image from the robot's point of view. This is passed through a Vision Transformer (ViT) to be converted into a sequence of image patch embeddings.
    *   **Text Instruction:** A natural language command, e.g., "pick up the red block". This is passed through a standard text tokenizer and embedding layer.

2.  **Fusion:** The image embeddings and text embeddings are concatenated into a single input sequence for the main Transformer model.
    `[CLS] [img_patch_1] ... [img_patch_N] [SEP] [text_token_1] ... [text_token_M]`

3.  **Transformer Processing:** This combined sequence is processed by a standard Transformer (encoder-decoder or decoder-only architecture).

4.  **Output Generation:** The model is trained to auto-regressively predict the sequence of action tokens we defined above.
    `[action_token_1] [action_token_2] ... [action_token_8]`

5.  **Loss Function:** The training objective is a standard cross-entropy loss between the predicted action token probabilities and the ground-truth action tokens from the expert demonstration data.

Below is a conceptual PyTorch-like implementation to illustrate the structure.

In [13]:
# Code Implementation: Conceptual VLA Model Structure (PyTorch-style)
import torch
import torch.nn as nn
import torch.nn.functional as F
# We'll use pre-trained models from huggingface to act as our powerful backbones
# In a real implementation like RT-2, these would be massive, custom models (PaLI, PaLM-E)
from transformers import ViTModel, BertModel, BertTokenizer, T5ForConditionalGeneration, T5Tokenizer

class ConceptualVLA(nn.Module):
    """A simplified VLA model to illustrate the RT-2 concept."""
    def __init__(self, action_vocab_size=256, num_action_dims=8):
        super().__init__()
        self.action_vocab_size = action_vocab_size
        self.num_action_dims = num_action_dims

        # 1. Vision Encoder: Use a pre-trained ViT. We freeze it to leverage its knowledge.
        self.vision_encoder = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        # for param in self.vision_encoder.parameters():
        #    param.requires_grad = False # Freeze the vision backbone

        # 2. Language Model: Use a pre-trained model like T5. This will be our main component.
        self.language_model = T5ForConditionalGeneration.from_pretrained('t5-small')
        self.tokenizer = T5Tokenizer.from_pretrained('t5-small')

        # The challenge is fusing vision into the language model.
        # A simple approach is to project vision embeddings into the text embedding space.
        self.vision_projector = nn.Linear(self.vision_encoder.config.hidden_size,
                                          self.language_model.config.d_model)

    def forward(self, image_input, text_instruction, target_action_tokens=None):
        """
        image_input: (batch_size, 3, 224, 224)
        text_instruction: list of strings
        target_action_tokens: (batch_size, num_action_dims)
        """
        # 1. Encode image to get patch embeddings
        vision_outputs = self.vision_encoder(pixel_values=image_input)
        image_embeddings = vision_outputs.last_hidden_state # (batch, num_patches+1, vision_dim)

        # 2. Project vision embeddings to match language model's dimension
        projected_image_embeddings = self.vision_projector(image_embeddings) # (batch, num_patches+1, lm_dim)

        # 3. Tokenize text instruction and get text embeddings
        instruction_tokens = self.tokenizer(text_instruction, return_tensors='pt', padding=True).input_ids.to(projected_image_embeddings.device)
        text_embeddings = self.language_model.encoder.embed_tokens(instruction_tokens) # (batch, text_len, lm_dim)

        # 4. Combine image and text embeddings into a single input sequence for the encoder
        # This is a critical and complex step. Simplistic concatenation is one way.
        encoder_inputs_embeddings = torch.cat([projected_image_embeddings, text_embeddings], dim=1)
        encoder_attention_mask = torch.cat([
            torch.ones(projected_image_embeddings.shape[:2], device=projected_image_embeddings.device),
            torch.ones(text_embeddings.shape[:2], device=text_embeddings.device)], dim=1)

        # 5. Prepare the decoder input. For training, this is the target action sequence.
        # For inference, it starts with a <pad> token and generates auto-regressively.
        if target_action_tokens is not None:
            # The T5 model expects a specific format for decoder inputs.
            # It uses token IDs, not embeddings. Here we simplify.
            # In a real setup, you'd add action tokens to your vocabulary.
            # Let's create a placeholder for the decoder_input_ids
            decoder_input_ids = target_action_tokens # This is a simplification.

            # 6. Get the full model output
            outputs = self.language_model(
                inputs_embeds=encoder_inputs_embeddings,
                attention_mask=encoder_attention_mask,
                labels=decoder_input_ids # T5 calculates loss internally if labels are provided
            )
            return outputs.loss, outputs.logits
        else:
            # Inference mode
            # The generate method handles auto-regressive decoding
            generated_ids = self.language_model.generate(
                inputs_embeds=encoder_inputs_embeddings,
                attention_mask=encoder_attention_mask,
                max_new_tokens=self.num_action_dims
            )
            return generated_ids


# Note: Running this cell will download pre-trained models and may take a moment.
print("Conceptual VLA Model defined. This is a structural blueprint, not a runnable training script.")
# model = ConceptualVLA()
# print(model)
print("To truly implement this, one would need to:")
print("1. Create a custom tokenizer that includes action tokens.")
print("2. Resize the language_model's token embedding layer to include the new action tokens.")
print("3. Gather a large dataset of (image, instruction, action_token_sequence) tuples.")
print("4. Write a full training loop to fine-tune the model.")

Conceptual VLA Model defined. This is a structural blueprint, not a runnable training script.
To truly implement this, one would need to:
1. Create a custom tokenizer that includes action tokens.
2. Resize the language_model's token embedding layer to include the new action tokens.
3. Gather a large dataset of (image, instruction, action_token_sequence) tuples.
4. Write a full training loop to fine-tune the model.


In [16]:
# Code Implementation: Conceptual VLA Model Structure & Demonstration (Corrected and Combined)

import torch
import torch.nn as nn
from unittest.mock import Mock

# --- Mocking the HuggingFace models for a fast, dependency-free demonstration ---
# This avoids downloading large models and makes the example run instantly.
mock_vit = Mock()
mock_vit.config.hidden_size = 768  # Standard ViT base model hidden size

mock_t5 = Mock()
mock_t5.config.d_model = 512       # Standard T5-small model hidden size
# Mock the embedding layer to return a tensor of the correct shape
mock_t5.encoder.embed_tokens.return_value = torch.rand(2, 7, 512)

mock_tokenizer = Mock()

# We need to temporarily "monkey-patch" the from_pretrained methods to return our mocks
# This is a standard testing technique.
ViTModel_original = ViTModel.from_pretrained if 'ViTModel' in globals() else None
T5ForConditionalGeneration_original = T5ForConditionalGeneration.from_pretrained if 'T5ForConditionalGeneration' in globals() else None
T5Tokenizer_original = T5Tokenizer.from_pretrained if 'T5Tokenizer' in globals() else None

from transformers import ViTModel, T5ForConditionalGeneration, T5Tokenizer
ViTModel.from_pretrained = lambda x: mock_vit
T5ForConditionalGeneration.from_pretrained = lambda x: mock_t5
T5Tokenizer.from_pretrained = lambda x: mock_tokenizer
# --- End Mocking ---


class ConceptualVLA(nn.Module):
    """A simplified VLA model to illustrate the RT-2 concept."""
    # **THE FIX IS HERE**: Added `d_model` to the __init__ signature.
    def __init__(self, d_model=512, action_vocab_size=256, num_action_dims=8):
        super().__init__()
        self.d_model = d_model
        self.action_vocab_size = action_vocab_size
        self.num_action_dims = num_action_dims

        self.vision_encoder = ViTModel.from_pretrained('mock')
        self.language_model = T5ForConditionalGeneration.from_pretrained('mock')
        self.tokenizer = T5Tokenizer.from_pretrained('mock')

        # This projection layer's dimensions now correctly use the d_model parameter.
        self.vision_projector = nn.Linear(self.vision_encoder.config.hidden_size, self.d_model)

    def forward(self, image_input, text_instruction, target_action_tokens=None):
        vision_outputs = self.vision_encoder(pixel_values=image_input)
        image_embeddings = vision_outputs.last_hidden_state
        projected_image_embeddings = self.vision_projector(image_embeddings)

        instruction_tokens = self.tokenizer(text_instruction, return_tensors='pt', padding=True).input_ids.to(projected_image_embeddings.device)
        text_embeddings = self.language_model.encoder.embed_tokens(instruction_tokens)

        encoder_inputs_embeddings = torch.cat([projected_image_embeddings, text_embeddings], dim=1)
        encoder_attention_mask = torch.ones(encoder_inputs_embeddings.shape[:2], device=encoder_inputs_embeddings.device)

        if target_action_tokens is not None:
            outputs = self.language_model(
                inputs_embeds=encoder_inputs_embeddings,
                attention_mask=encoder_attention_mask,
                labels=target_action_tokens
            )
            return outputs.loss, outputs.logits
        else:
            generated_ids = self.language_model.generate(
                inputs_embeds=encoder_inputs_embeddings,
                attention_mask=encoder_attention_mask,
                max_new_tokens=self.num_action_dims
            )
            return generated_ids


# --- Runnable Demonstration of the VLA structure ---
print("--- Demonstrating the VLA model data flow ---")

# 1. Instantiate the model. This call is now correct.
model = ConceptualVLA(d_model=512)

# 2. Create dummy input data
batch_size = 2
dummy_images = torch.rand(batch_size, 3, 224, 224)
dummy_instructions = ["pick up the block", "open the drawer"]
dummy_actions = torch.randint(0, 256, (batch_size, 8))

# 3. Demonstrate INFERENCE mode (generates actions)
print("\n--- Running in INFERENCE mode ---")
# Mock the model's return values for this specific run
mock_vit.return_value.last_hidden_state = torch.rand(batch_size, 197, 768)
mock_t5.generate.return_value = torch.randint(0, 256, (batch_size, 8))

generated_tokens = model(image_input=dummy_images, text_instruction=dummy_instructions)
print(f"Input image shape: {dummy_images.shape}")
print(f"Input instructions: {dummy_instructions}")
print(f"Shape of GENERATED action tokens: {generated_tokens.shape}")
print(f"Generated action tokens (dummy):\n{generated_tokens}")


# 4. Demonstrate TRAINING mode (calculates loss)
print("\n--- Running in TRAINING mode ---")
# Mock the model's return values for this specific run
mock_t5.return_value = Mock(loss=torch.tensor(2.5), logits=torch.rand(batch_size, 8, 512))

loss, logits = model(image_input=dummy_images, text_instruction=dummy_instructions, target_action_tokens=dummy_actions)
print(f"Input image shape: {dummy_images.shape}")
print(f"Input instructions: {dummy_instructions}")
print(f"Target action shape: {dummy_actions.shape}")
print(f"Calculated Loss (dummy): {loss.item():.4f}")
print(f"Shape of output Logits: {logits.shape}")


# --- Cleanup: Restore original methods if they existed ---
if ViTModel_original: ViTModel.from_pretrained = ViTModel_original
if T5ForConditionalGeneration_original: T5ForConditionalGeneration.from_pretrained = T5ForConditionalGeneration_original
if T5Tokenizer_original: T5Tokenizer.from_pretrained = T5Tokenizer_original
print("\n(Mocking has been cleaned up)")

--- Demonstrating the VLA model data flow ---

--- Running in INFERENCE mode ---
Input image shape: torch.Size([2, 3, 224, 224])
Input instructions: ['pick up the block', 'open the drawer']
Shape of GENERATED action tokens: torch.Size([2, 8])
Generated action tokens (dummy):
tensor([[ 95, 137,  62, 225,   9, 238,  42, 254],
        [152, 131, 232,  29, 161,  53,   8,  23]])

--- Running in TRAINING mode ---
Input image shape: torch.Size([2, 3, 224, 224])
Input instructions: ['pick up the block', 'open the drawer']
Target action shape: torch.Size([2, 8])
Calculated Loss (dummy): 2.5000
Shape of output Logits: torch.Size([2, 8, 512])

(Mocking has been cleaned up)


### 2.3. Positive Transfer: The Magic of Co-Training

A key finding from the RT-2 work is the importance of **positive transfer**. Fine-tuning a VLM on *only* robotics data often leads to catastrophic forgetting, where the model forgets its vast web-scale knowledge and overfits to the small robotics dataset.

**Co-training:** The successful recipe involves training on a *mixture* of data:
`Training Batch = p_robot * [Robotics Data] + p_vqa * [VQA Data] + p_caption * [Captioning Data] + ...`

**Why does this work?**
*   **Knowledge Retention:** The web-scale tasks (like Visual Question Answering and captioning) force the model to keep using and maintaining its general semantic and visual understanding.
*   **Regularization:** The diverse web data acts as a powerful regularizer, preventing the model from overfitting to the idiosyncrasies of the (relatively small) robotics dataset.
*   **Emergent Capabilities:** The model can leverage its web knowledge for robotics tasks in surprising ways. For the prompt `"pick up the extinct animal"`, the reasoning `extinct animal -> dinosaur` comes from the web data, while the ability to `pick up -> [action tokens]` comes from the robotics data. Co-training allows the model to connect these two domains.


In [4]:
# Visualization: The Effect of Co-Training on Generalization
epochs = np.arange(1, 51)

# Simulated validation loss curves
# Robotics-only fine-tuning: overfits quickly
loss_robot_only = 2.5 * np.exp(-epochs/5) + np.random.normal(0, 0.1, 50) + 0.01 * epochs

# Co-training with web data: generalizes better
loss_cotraining = 2.5 * np.exp(-epochs/10) + np.random.normal(0, 0.08, 50) + 0.5

# In-domain success rate (on tasks similar to training)
rate_in_domain_robot = 1 - (loss_robot_only / 3)
rate_in_domain_cotrain = 1 - (loss_cotraining / 3)

# Out-of-domain success rate (on new, unseen tasks - measures generalization)
rate_ood_robot = 0.5 * (1 - (loss_robot_only / 1.5)) - 0.2 # Generalization collapses
rate_ood_cotrain = 0.8 * (1 - (loss_cotraining / 2.5)) # Generalization is maintained

fig = make_subplots(rows=1, cols=2, subplot_titles=("In-Domain Performance", "Generalization to New Tasks (OOD)"))

fig.add_trace(go.Scatter(x=epochs, y=rate_in_domain_robot, mode='lines', name='Robotics-Only Fine-tuning', line=dict(color='red')), row=1, col=1)
fig.add_trace(go.Scatter(x=epochs, y=rate_in_domain_cotrain, mode='lines', name='Co-training with Web Data', line=dict(color='green')), row=1, col=1)

fig.add_trace(go.Scatter(x=epochs, y=rate_ood_robot, mode='lines', name='Robotics-Only Fine-tuning', line=dict(color='red', dash='dash'), showlegend=False), row=1, col=2)
fig.add_trace(go.Scatter(x=epochs, y=rate_ood_cotrain, mode='lines', name='Co-training with Web Data', line=dict(color='green', dash='dash'), showlegend=False), row=1, col=2)

fig.update_layout(title_text="Conceptual Visualization of Positive Transfer", yaxis_title='Success Rate', xaxis_title='Training Epochs')
fig.update_yaxes(range=[0, 1])
fig.show()

## Section 3: New Interfaces for LLMs - Decoupling Reasoning from Control

While VLA models are powerful, they require fine-tuning and bake the low-level control directly into the LLM's weights. The second part of the lecture explores an alternative: using LLMs as high-level **reasoners** that interface with separate, more traditional **low-level controllers**. This decouples the 'what' from the 'how'.

### 3.1. Language to Reward (L2R): LLMs as Goal Setters

**Core Idea:** Instead of generating action tokens, the LLM generates a **reward function** as executable code. A separate, specialized optimizer (like a Model Predictive Controller) then finds the actions that maximize this reward.

**Analogy:** You are a project manager (the LLM). You don't tell your expert engineer (the Optimizer) exactly which keys to press. Instead, you define the project's success metrics (the reward function): `"Success = deliver feature X on time AND stay under budget Y"`. The engineer then uses their expertise to figure out the best way to achieve those metrics.

**Conceptual Data Flow:**
1.  **Prompt:** User gives a high-level command: `"Move the block to the green circle."`
2.  **LLM as Reward Coder:** The LLM is prompted to translate this command into a Python function.
    `LLM_Output => "def get_reward(state):\n  block_pos = state['block_pos']\n  target_pos = state['green_circle_pos']\n  distance = np.linalg.norm(block_pos - target_pos)\n  return -distance"`
3.  **Low-Level Optimizer (e.g., MPC):** This controller now has a clear, mathematical objective. It operates in a tight loop:
    a. From the current state, imagine thousands of possible short action sequences.
    b. For each imagined sequence, predict the final state.
    c. Use the LLM-generated `get_reward` function to score each predicted final state.
    d. Choose the action sequence that leads to the highest reward.
    e. Execute the *first* action from that best sequence.
    f. Repeat from the new state.

**Advantages:**
*   **Zero-shot from LLM:** No robotics fine-tuning is needed for the LLM.
*   **Leverages existing controllers:** We can use powerful, well-understood optimizers for the low-level part.
*   **Expressiveness & Safety:** The reward code can be complex, including penalties for getting too close to obstacles, encouraging smooth motion, etc., which is hard to express with action tokens. Safety constraints can be hard-coded.
*   **Debuggability:** The reward function is human-readable code, making it easier to debug than a black-box VLA model.

In [14]:
# Code Implementation: Interactive Language-to-Reward System (Corrected for Visualization)
import numpy as np
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display

# --- SETUP: The Simulated Environment ---
class Simple2DEnv:
    def __init__(self):
        self.agent_pos = np.array([0.5, 0.5])
        self.dt = 0.1

    def step(self, action):
        action_scaled = np.array(action) * 0.3
        self.agent_pos += np.clip(action_scaled, -0.1, 0.1) * self.dt
        self.agent_pos = np.clip(self.agent_pos, 0, 1)
        return self.get_state()

    def get_state(self):
        return {'agent_pos': self.agent_pos.copy()}

    def reset(self):
        self.agent_pos = np.array([0.5, 0.5])

# --- Mock LLM Reward Coder ---
def mock_llm_reward_coder(prompt):
    prompt = prompt.lower()
    if "top right" in prompt:
        target_str = "np.array([1.0, 1.0])"
    elif "bottom left" in prompt:
        target_str = "np.array([0.0, 0.0])"
    elif "center" in prompt:
        target_str = "np.array([0.5, 0.5])"
    else:
        reward_code = "# Unknown command\ndef get_reward(state): return 0.0"
        return reward_code, lambda state: 0.0, None

    reward_code = f"""\
import numpy as np
def get_reward(state):
    agent_pos = state['agent_pos']
    target_pos = {target_str}
    # Reward is negative distance (we want to maximize reward, so minimize distance)
    distance = np.linalg.norm(agent_pos - target_pos)
    return -distance
"""
    namespace = {}
    exec(reward_code, globals(), namespace)
    target_pos_val = eval(target_str)
    return reward_code, namespace['get_reward'], target_pos_val

# --- Low-Level Optimizer (Simple Random Shooting MPC) ---
class SimpleMPCOptimizer:
    def __init__(self, reward_func, planning_horizon=10, num_samples=500):
        self.reward_func = reward_func
        self.horizon = planning_horizon
        self.num_samples = num_samples
        self.action_dim = 2

    def get_best_action(self, current_state):
        if self.reward_func is None: return np.zeros(self.action_dim)
        random_action_sequences = np.random.uniform(-1.0, 1.0, size=(self.num_samples, self.horizon, self.action_dim))
        final_rewards = np.zeros(self.num_samples)
        current_agent_pos = current_state['agent_pos']
        for i in range(self.num_samples):
            sim_pos = current_agent_pos.copy()
            for t in range(self.horizon):
                action_scaled = np.array(random_action_sequences[i, t]) * 0.3
                sim_pos += np.clip(action_scaled, -0.1, 0.1) * 0.1
                sim_pos = np.clip(sim_pos, 0, 1)
            final_rewards[i] = self.reward_func({'agent_pos': sim_pos})
        best_sequence_idx = np.argmax(final_rewards)
        return random_action_sequences[best_sequence_idx, 0]

# --- Interactive Visualization Setup ---
prompt_input = widgets.Text(value='Move to the top right', description='Prompt:', layout={'width': '400px'})
run_button = widgets.Button(description="Run Simulation")
reward_code_output = widgets.HTML(value="")

# **FIX:** Use go.FigureWidget for the plot
l2r_fig_widget = go.FigureWidget(layout=go.Layout(
    xaxis=dict(range=[0, 1]), yaxis=dict(range=[0, 1], scaleanchor="x", scaleratio=1),
    width=500, height=500, showlegend=True
))

def run_simulation(b):
    env = Simple2DEnv()
    code, reward_func, target_pos = mock_llm_reward_coder(prompt_input.value)
    reward_code_output.value = f"<b>LLM-Generated Reward Code:</b><pre>{code}</pre>"

    if reward_func:
        optimizer = SimpleMPCOptimizer(reward_func)
        trajectory = [env.agent_pos.copy()]
        for _ in range(50):
            action = optimizer.get_best_action(env.get_state())
            env.step(action)
            trajectory.append(env.agent_pos.copy())
        trajectory = np.array(trajectory)

        # **FIX:** Update the FigureWidget's data instead of recreating it
        with l2r_fig_widget.batch_update():
            l2r_fig_widget.data = [] # Clear previous traces
            l2r_fig_widget.add_trace(go.Scatter(x=trajectory[:,0], y=trajectory[:,1], mode='lines+markers', name='Agent Path'))
            if target_pos is not None:
                l2r_fig_widget.add_trace(go.Scatter(x=[target_pos[0]], y=[target_pos[1]], mode='markers', name='Target', marker=dict(color='green', size=15, symbol='star')))
            l2r_fig_widget.layout.title = f'L2R Simulation for: "{prompt_input.value}"'

run_button.on_click(run_simulation)
display(widgets.VBox([widgets.HBox([prompt_input, run_button]), reward_code_output, l2r_fig_widget]))

# Trigger the first run to show an initial plot
run_simulation(None)

VBox(children=(HBox(children=(Text(value='Move to the top right', description='Prompt:', layout=Layout(width='…

### 3.2. In-Context Reinforcement Learning: LLMs as Pattern Machines

This is perhaps the most forward-looking idea: can the LLM's context window itself act as a replay buffer for reinforcement learning? The paper "Large Language Models as General Pattern Machines" explores this.

**Core Idea:** Instead of training a separate policy network (like in traditional RL) or fine-tuning the LLM, we leverage the LLM's immense pattern-matching ability. We structure the prompt as a sequence of `(state, action, reward)` tuples and ask the model to "complete the pattern" by generating a new action sequence that leads to a higher reward.

**Prompt Structure for In-Context RL:**
```
Here are some attempts to solve the task. Lower reward is worse, higher is better.

--- Attempt 1 ---
Reward: -5.8
Trajectory:
State: [0.5, 0.5], Action: [0.1, -0.2], Next State: [0.51, 0.48]
State: [0.51, 0.48], Action: [-0.3, 0.1], Next State: [0.48, 0.49]
...

--- Attempt 2 ---
Reward: -2.1
Trajectory:
State: [0.5, 0.5], Action: [0.4, 0.3], Next State: [0.54, 0.53]
State: [0.54, 0.53], Action: [0.2, 0.5], Next State: [0.56, 0.58]
...

--- My Best Attempt ---
Reward: -0.5
Trajectory:
State: [0.5, 0.5], Action: [?, ?, ?], Next State: [?, ?, ?]
...
```
By asking the LLM to complete the prompt for "My Best Attempt" with a target reward higher than any seen before, we coax it into performing policy improvement. The lecture's "clicker training" is a simplified, interactive version of this.

**Advantages:**
*   **No Gradients:** Learning happens entirely within the forward pass of the LLM.
*   **Highly Interactive:** Humans can provide real-time feedback (the clicks/rewards) to shape behavior.
*   **Leverages LLM's Pattern Matching:** Instead of learning from scratch, it uses the LLM's pre-trained ability to find correlations and extrapolate patterns.

In [11]:
# Code Implementation: Interactive In-Context "Clicker Training" (Corrected for Visualization)
import numpy as np
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display

# --- SETUP: The Simulated Environment (was missing) ---
class Simple2DEnv:
    def __init__(self):
        self.agent_pos = np.array([0.5, 0.5])
        self.dt = 0.1

    def step(self, action):
        # A small multiplier to make agent movement more visible per step
        action_scaled = np.array(action) * 0.3
        self.agent_pos += np.clip(action_scaled, -0.1, 0.1) * self.dt
        self.agent_pos = np.clip(self.agent_pos, 0, 1)
        return self.get_state()

    def get_state(self):
        return {'agent_pos': self.agent_pos.copy()}

    def reset(self):
        self.agent_pos = np.array([0.5, 0.5])
        return self.get_state()

# --- Global State for the Interactive Session ---
click_env = Simple2DEnv()
click_history = []
last_state = click_env.get_state()
last_action = np.zeros(2)

# --- Mock LLM Pattern Machine ---
def mock_llm_in_context_rl(history):
    """Finds the action that led to the highest reward in history and returns it."""
    if not history:
        return np.random.uniform(-1.0, 1.0, size=2)

    best_action = None
    max_reward = -np.inf
    for state, action, reward, next_state in history:
        if reward > max_reward:
            max_reward = reward
            best_action = action

    if best_action is not None and max_reward > 0:
        # If we have a good action, repeat it with some exploration
        return np.array(best_action) + np.random.normal(0, 0.2, size=2)
    # If no good action found yet, explore randomly
    return np.random.uniform(-1.0, 1.0, size=2)

# --- Interactive Visualization Setup ---
click_good_button = widgets.Button(description="Good! (+1 Reward)")
click_bad_button = widgets.Button(description="Bad! (-1 Reward)")
click_step_button = widgets.Button(description="Take Agent Step")
reset_button = widgets.Button(description="Reset")
history_output = widgets.HTML(
    value="<b>History:</b><br>",
    layout={'border': '1px solid black', 'padding': '5px', 'width': '300px', 'height': '350px', 'overflow_y':'scroll'}
)

# **FIX:** Use go.FigureWidget for persistent, updatable plots
fig_widget = go.FigureWidget(
    data=[go.Scatter(x=[click_env.agent_pos[0]], y=[click_env.agent_pos[1]], mode='markers', marker=dict(size=20, color='red', symbol='x'))],
    layout=go.Layout(
        title=dict(text='Clicker Training Environment'),
        xaxis=dict(range=[0, 1], showgrid=False, zeroline=False),
        yaxis=dict(range=[0, 1], showgrid=False, zeroline=False, scaleanchor="x", scaleratio=1),
        width=400,
        height=400
    )
)

def update_plot_data():
    """Instead of recreating the figure, just update the data of the existing FigureWidget."""
    with fig_widget.batch_update():
        fig_widget.data[0].x = [click_env.agent_pos[0]]
        fig_widget.data[0].y = [click_env.agent_pos[1]]

def take_step(b):
    """Callback for the 'Take Agent Step' button."""
    global last_state, last_action
    last_state = click_env.get_state()
    last_action = mock_llm_in_context_rl(click_history)
    click_env.step(last_action)
    update_plot_data()

def add_feedback(reward):
    """Higher-order function to create callbacks for feedback buttons."""
    def on_button_clicked(b):
        global click_history, last_state, last_action
        next_state = click_env.get_state()
        click_history.append((last_state, last_action.tolist(), reward, next_state))
        history_output.value += f"Feedback ({'Good' if reward > 0 else 'Bad'}) given for last step.<br>"
        # Keep history bounded
        if len(click_history) > 10:
            click_history = click_history[-10:]
    return on_button_clicked

def reset_sim(b):
    """Callback for the 'Reset' button."""
    global click_history, last_state, last_action
    click_history = []
    click_env.reset()
    last_state = click_env.get_state()
    last_action = np.zeros(2)
    history_output.value = "<b>History:</b><br>--- RESET ---<br>"
    update_plot_data()

# Wire up the buttons to their functions
click_step_button.on_click(take_step)
reset_button.on_click(reset_sim)
click_good_button.on_click(add_feedback(1))
click_bad_button.on_click(add_feedback(-1))

# Display the user interface
print("Instructions: Click 'Take Agent Step'. The agent moves based on past feedback. Give it feedback with 'Good' or 'Bad' buttons. Use Reset to start over. Try to guide it to a corner!")
controls = widgets.VBox([
    widgets.HBox([click_step_button, reset_button]),
    widgets.HBox([click_good_button, click_bad_button])
])
display(widgets.HBox([widgets.VBox([controls, history_output]), fig_widget]))

Instructions: Click 'Take Agent Step'. The agent moves based on past feedback. Give it feedback with 'Good' or 'Bad' buttons. Use Reset to start over. Try to guide it to a corner!


HBox(children=(VBox(children=(VBox(children=(HBox(children=(Button(description='Take Agent Step', style=Button…

In [10]:
from google.colab import output
output.enable_custom_widget_manager()

Support for third party widgets will remain active for the duration of the session. To disable support:

## Section 4: Conclusion - Challenges and the Path Forward

The integration of foundation models into robotics is one of the most exciting frontiers in AI. This lecture and the associated survey paper highlight two promising, yet distinct, paths:

1.  **VLA Models (e.g., RT-2):** This path treats robotics as a grand, unified sequence modeling problem. It aims to create a single end-to-end model that maps pixels and text directly to low-level actions.
    *   **Pros:** Enormous potential for positive transfer from web-scale data; emergent capabilities.
    *   **Cons:** Requires massive robotics datasets for fine-tuning; can be a 'black box'; action space discretization imposes limits.

2.  **Decoupled Reasoner-Optimizer Models (e.g., L2R):** This path uses LLMs for what they do best—high-level semantic reasoning and code generation—while leaving low-level, physics-based optimization to specialized controllers.
    *   **Pros:** More modular, interpretable, and sample-efficient from the LLM's perspective; can leverage decades of research in optimal control.
    *   **Cons:** Potential for mismatch between LLM's 'understanding' and the optimizer's capabilities; the 'sim-to-real' gap for the optimizer is still a major hurdle.

**Critical Future Challenges (from Lecture and Paper):**
*   **Efficient Data Collection:** The primary bottleneck remains the collection of large-scale, diverse robotics data. RT-X is a major step, but it's still orders of magnitude smaller than web-scale text/image datasets.
*   **Safety and Hallucination:** An LLM hallucinating a fact is one thing; a robot hallucinating the absence of a person or obstacle is a critical safety failure. Robust safety protocols and reducing model hallucination are paramount.
*   **Synergy of Planning and Control:** Seamlessly combining high-level, long-horizon planning with reactive, real-time low-level control is the ultimate goal. Current systems are still quite separate.
*   **Computational Efficiency:** Deploying billion-parameter models for real-time control on resource-constrained robots requires significant advances in model compression, quantization, and specialized hardware.

This field is moving at an incredible pace. By understanding these fundamental approaches and their trade-offs, you are now equipped to follow, critique, and contribute to the quest for truly intelligent embodied agents.