# Learning Code as Policy for Metaworld

This notebook shows a basic example of using the optimizer to follow language feedback online during interaction. We decorate the gym env to make it traceable and then run the optimizer for every time step.

In [1]:
# Run experiment

seed = 0
horizon = 30
env_name = "llf-metaworld-pick-place-v2"
stepsize = 1

In [2]:
import llfbench
import autogen.trace as trace
from autogen.trace.optimizers import FunctionOptimizer
import random
import numpy as np


class TracedEnv:

    def __init__(self, env_name, seed=0):
        random.seed(seed)
        np.random.seed(seed)
        self.env = llfbench.make(env_name)
        self.env.reset(seed=seed)
        self.env.action_space.seed(seed)

    @trace.trace_op(n_outputs=2)
    def reset(self):
        """
        Reset the environment and return the initial observation and info.
        """
        return self.env.reset()  # obs, info

    @trace.trace_op(n_outputs=5)
    def step(self, action):
        """
        Take action in the environment and return the next observation, reward, termination, truncation, and info.
        """
        return self.env.step(action)


def user_feedback(obs, action, next_obs):
    """
    Provide feedback from the user.
    """
    return f"Taking action {action.data} at observation {obs['observation'].data} resulted in next observation {next_obs['observation'].data}. Recieved feedback {next_obs['feedback'].data}."

First, we run the expert policy for the environment to get a sense of oracle performance.

In [3]:
def expert_run(env, horizon):

    # Initialize the environment
    obs, info = env.reset()

    # Rollout
    sum_of_rewards = 0
    t = 0
    expert_action = None
    while t < horizon:
        action = env.env.action_space.sample() if expert_action is None else expert_action
        next_obs, reward, termination, truncation, info = env.step(action)
        expert_action = env.env.expert_action

        sum_of_rewards += reward.data  # not traced
        t += 1
        if termination or truncation or info.data["success"]:
            break

    print("Sum of rewards:", sum_of_rewards)
    print("Success:", info.data["success"])
    print("Termination:", termination.data)
    print("Truncation:", truncation.data)
    print("# of time steps:", t)

    return sum_of_rewards, info.data["success"], termination.data, truncation.data


env = TracedEnv(env_name, seed=seed)
sum_of_rewards, success, termination, truncation = expert_run(env, horizon=horizon)

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  logger.warn(


Sum of rewards: 16.13824992438523
Success: True
Termination: False
Truncation: False
# of time steps: 7


We see the optimizer is using code as actions, to follow the languge feedback. 

In [4]:
def single_step(env, horizon, user_feedback, controller, optimizer, max_iter=None):
    """Run optimizer step for every time step."""

    max_iter = max_iter or horizon * 2

    # Initialize the environment
    obs, info = env.reset()

    # Rollout
    sum_of_rewards = 0
    t = 0
    i = 0
    while t < horizon and i < max_iter:
        error = None
        try:
            # Detach; otherwise, it would be back-propagated across time.
            action = controller(obs["observation"].detach())
            next_obs, reward, termination, truncation, info = env.step(action)
        except trace.TraceExecutionError as e:
            error = e

        if error is None:
            feedback = user_feedback(obs, action, next_obs)  # not traced
            obs = next_obs
            target = next_obs["observation"]
            # Log
            sum_of_rewards += reward.data  # not traced
            t += 1  # time step
            if termination or truncation or info.data["success"]:
                break
        else:  # Self debugging
            feedback = str(error)
            target = error.exception_node

        # Optimization step
        optimizer.zero_feedback()
        optimizer.backward(target, feedback)  # obs = next obs
        optimizer.step(verbose="output")
        i += 1  # optimization iteration

        print(f"Time Step: {t} of {horizon}")
        print(f"Iteration: {i}")
        print(f"Feedback: {feedback}")
        print(f"Variable:\n {controller.parameter.data}")

    print("Sum of rewards:", sum_of_rewards)
    print("Success:", info.data["success"])
    print("Termination:", termination.data)
    print("Truncation:", truncation.data)
    print("# of optimization iterations:", i)
    print("# of time steps:", t)

    return sum_of_rewards, info.data["success"], termination.data, truncation.data, optimizer


env = TracedEnv(env_name, seed=seed)


action_space = env.env.action_space

# Declare the controller to be trainable


@trace.trace_op(trainable=True)
def controller(obs):
    """
    The controller takes in an observation and returns an action.
    """
    return action_space.sample()


# Create an optimizer
optimizer = trace.optimizers.FunctionOptimizer(controller.parameters(), stepsize=stepsize)
sum_of_rewards, success, termination, truncation, optimizer = single_step(
    env, horizon=horizon, controller=controller, user_feedback=user_feedback, optimizer=optimizer
)

LLM response:
 {
"reasoning": "Based on the feedback, the action taken as a result of the current controller's logic resulted in an undesired state transition and the suggestion indicates a specific action vector [-0.07, 0.68, 0.12, 0.0] that should be taken instead. Since the feedback step size is 1, we fully trust the feedback and directly incorporate the recommended action into the controller logic. Therefore, the controller function needs to be adjusted to return this specific action vector, rather than using action_space.sample().",
"suggestion": {
    "__code0": "def controller(obs):\n    \"\"\"\n    The controller takes in an observation and returns an action.\n    Adjusted to provide specific action based on feedback suggesting a more efficient move.\n    \"\"\"\n    return [-0.07, 0.68, 0.12, 0.0]"
}
}
Time Step: 1 of 30
Iteration: 1
Feedback: Taking action [ 0.27392337 -0.46042657 -0.91805295 -0.96694473] at observation {"hand_pos": "[0.006 0.6   0.194]", "gripper_distance_ap