In [1]:
import gymnasium as gym
import numpy as np
import pandas as pd

# Get prompts

In [2]:
def obs_to_text(obs):
    starter = "Observation from last step: "
    hull_angle_speed = f"Hull angle: {obs[0]:.2f}"
    angular_velocity = f"Angular velocity: {obs[1]:.2f}"
    x_velocity = f"X velocity: {obs[2]:.2f}"
    y_velocity = f"Y velocity: {obs[3]:.2f}"
    back_revolute_joint_angle = f"Back revolute joint angle: {obs[4]:.2f}"
    back_revolute_joint_speed = f"Back revolute joint speed: {obs[5]:.2f}"
    back_lower_leg_angle = f"Back lower leg angle: {obs[6]:.2f}"
    back_lower_leg_speed = f"Back lower leg speed: {obs[7]:.2f}"
    back_leg_ground_contact_flag = f"Back leg ground contact flag: {obs[8]:.2f}"
    front_revolute_joint_angle = f"Front revolute joint angle: {obs[9]:.2f}"
    front_revolute_joint_speed = f"Front revolute joint speed: {obs[10]:.2f}"
    front_lower_leg_angle = f"Front lower leg angle: {obs[11]:.2f}"
    front_lower_leg_speed = f"Front lower leg speed: {obs[12]:.2f}"
    front_leg_ground_contact_flag = f"Front leg ground contact flag: {obs[13]:.2f}"
    
    # Lidars
    lidar_angles = [0.0, 0.15, 0.3, 0.45, 0.6, 0.75, 0.9, 1.05, 1.2, 1.35] # In radians starting from the top of the hull
    lidar_distances = obs[14:24]
    lidar_readings = [f"Lidar {i+1} ({angle:.2f} rad): {distance:.2f}" for i, (angle, distance) in enumerate(zip(lidar_angles, lidar_distances))]
    
    return "\n".join([starter, hull_angle_speed, angular_velocity, x_velocity, y_velocity, back_revolute_joint_angle, back_revolute_joint_speed, back_lower_leg_angle, back_lower_leg_speed, back_leg_ground_contact_flag, front_revolute_joint_angle, front_revolute_joint_speed, front_lower_leg_angle, front_lower_leg_speed, front_leg_ground_contact_flag] + lidar_readings)

def reward_to_text(reward):
    return f"The reward from the last step was: {reward:.2f}"

def text_to_action(text):
    """
        Given an output by the LLM in the form:
            Move Back revolute joint {value}, Back lower leg {value}, Front revolute joint {value}, Front lower leg {value}
        This function will return the corresponding action values for the environment 
    """
    action = [0, 0, 0, 0]
    split_text = text.split(", ")
    for i, action_value in enumerate(split_text):
        action[i] = float(action_value.split(" ")[-1])
    return action
    

In [3]:
primer = """
Imagine you are an expert at controlling a bipedal machine, and you have been given the task of controlling a bipedal machine in a reinforcement learning environment. The challenge is to walk a bipedal machine as far as possible without falling over. The machine has a hull, two legs, a back and front revolute joint, and lower leg. The machine has 10 lidar sensors that can detect the distance to objects in front of it. 

You will be given scenarios in the form of observations from the environment and the reward based on the last action and the Current Total Score. Your goal is to provide the next action in the following format:

'Move Back revolute joint `value`, Back lower leg 'value', Front revolute joint 'value', Front lower leg 'value''

The values must be in the range [-1, 1], and your goal is to maximize the reward and move the machine forward effectively. Only provide the action and no additional explanation.

Example
'Observation from last step: \nHull angle: -0.02\nAngular velocity: -0.03\nX velocity: -0.03\nY velocity: -0.01\nBack revolute joint angle: 0.48\nBack revolute joint speed: 1.00\nBack lower leg angle: 0.07\nBack lower leg speed: -1.00\nBack leg ground contact flag: 1.00\nFront revolute joint angle: 0.38\nFront revolute joint speed: 1.00\nFront lower leg angle: 0.08\nFront lower leg speed: -1.00\nFront leg ground contact flag: 1.00\nLidar 1 (0.00 rad): 0.45\nLidar 2 (0.15 rad): 0.45\nLidar 3 (0.30 rad): 0.47\nLidar 4 (0.45 rad): 0.50\nLidar 5 (0.60 rad): 0.54\nLidar 6 (0.75 rad): 0.61\nLidar 7 (0.90 rad): 0.72\nLidar 8 (1.05 rad): 0.90\nLidar 9 (1.20 rad): 1.00\nLidar 10 (1.35 rad): 1.00'
'The reward from the last step was: -0.25'
Score: -0.25

Action: Move Back revolute joint 0.5, Back lower leg -0.3, Front revolute joint 0.2, Front lower leg 0.7
"""

obs_skeleton = """
_
{obs_step}
{reward_step}
score: {score_step}

Action: 
"""

obs_skeleton_reset = """
Now, given the following observation, rewards and score give me the next action for the following scenario:
_
{obs_step}
reward: None as it is the first step
score: 0

Action:
"""

## Training testing

In [None]:
import openai
import os
import time

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
STEPS = 50
OBS_TEXT_LENGTH = 800 # Approx
PRIMER_STARTER_LENGTH = 2000 # Approx
NUMBER_OF_OBS_MEMORY = 3 
save_dir = "runs/bipedal_walker/"

In [None]:
# Init environment
env = gym.make("BipedalWalker-v3", render_mode="human")
obs, _ = env.reset()

# Init OpenAI
client = openai.OpenAI()
def get_action(primer, client):
    print("Getting action from LLM")
    response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": primer}
    ],
    response_format={
      "type": "text"
    },
    temperature=0.2,
    max_tokens=200,
    top_p=1,
    frequency_penalty=0,
    presence_penalty=0
    )

    action_text = response.choices[0].message.content
    return action_text

def save_obs(obs, reward, score, step, store={}):
    obs_text = obs_to_text(obs)
    reward_text = reward_to_text(reward)
    score_text = f"Score: {score}"
    obs_skeleton_filled = obs_skeleton.format(obs_step=obs_text, reward_step=reward_text, score_step=score_text)
    inner = {
        "obs": obs,
        "obs_text": obs_text,
        "reward": reward,
        "reward_text": reward_text,
        "score": score,
        "skeleton": obs_skeleton_filled
    }
    store[step] = inner


In [None]:
score, reward, done, store = 0, 0, False, {}
obs, _ = env.reset()

# Make first primer
obs_text = obs_to_text(obs)
primer += obs_skeleton_reset.format(obs_step=obs_text)

for i in range(STEPS):
    # sleep to not exceed the rate limit
    time.sleep(5)

    # Get action from gpt
    action_text = get_action(primer, client)
    action = text_to_action(action_text)

    save_obs(obs, reward, score, i, store)

    # Perform action
    obs, reward, done, _, _ = env.step(action)
    if done:
        print("The bipedal machine has fallen over/took too long. The total score was: ", score)
        break

    # Update score
    score += reward

    # Generate new primer
    obs_text = obs_to_text(obs)
    reward_text = reward_to_text(reward)
    obs_step = obs_skeleton.format(obs_step=obs_text, reward_step=reward_text, score_step=score)
    primer += action_text + "\n" + obs_step

    # Check length of primer
    if len(primer) > PRIMER_STARTER_LENGTH + OBS_TEXT_LENGTH*NUMBER_OF_OBS_MEMORY or len(primer) > 4096:
        # split at "_" get the primer front and the continuation
        primer_splits = primer.split("_")
        primer = primer_splits[0] + "_" + "".join(primer_splits[2:]) # Remove the first observation and reward from memory

# Save the store as df
df = pd.DataFrame(store).T

Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action from LLM
Getting action f

## Test with other LLMs
- OpenAI API too slow due to rate limits

In [10]:
import torch
from transformers import pipeline
from huggingface_hub import InferenceClient
from openai import OpenAI
import os

## Inference APi

# Running the model locally
- Works but will have to choose a smaller model

In [19]:
messages = [
    {"role": "user", "content": "Who are you?"},
]
pipe = pipeline("text-generation", model="meta-llama/Llama-3.2-3B-Instruct", device="cuda")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [22]:
pipe(
    messages,
    max_new_tokens=100,
    pad_token_id=50256,
    return_full_text=True
)

[{'generated_text': [{'role': 'user', 'content': 'Who are you?'},
   {'role': 'assistant',
    'content': 'I\'m an artificial intelligence model known as Llama. Llama stands for "Large Language Model Meta AI."'}]}]

In [12]:
# Llama 1B
pipe = pipeline("text-generation", model="meta-llama/Llama-3.2-1B", device="cuda")

In [34]:
generated = pipe(
    primer,
    pad_token_id=50256,
    max_new_tokens=len(primer) + 100
)

In [36]:
print(generated[0]['generated_text'])


Imagine you are an expert at controlling a bipedal machine, and you have been given the task of controlling a bipedal machine in a reinforcement learning environment. The challenge is to walk a bipedal machine as far as possible without falling over. The machine has a hull, two legs, a back and front revolute joint, and lower leg. The machine has 10 lidar sensors that can detect the distance to objects in front of it. 

You will be given scenarios in the form of observations from the environment and the reward based on the last action and the Current Total Score. Your goal is to provide the next action in the following format:

'Move Back revolute joint `value`, Back lower leg 'value', Front revolute joint 'value', Front lower leg 'value''

The values must be in the range [-1, 1], and your goal is to maximize the reward and move the machine forward effectively. Only provide the action and no additional explanation.

Example
'Observation from last step: 
Hull angle: -0.02
Angular veloc

In [33]:
len("Move Back revolute joint -0.4, Back lower leg 0.4, Front revolute joint 0.0, Front lower leg -0.6")

97

In [30]:
print(primer)


Imagine you are an expert at controlling a bipedal machine, and you have been given the task of controlling a bipedal machine in a reinforcement learning environment. The challenge is to walk a bipedal machine as far as possible without falling over. The machine has a hull, two legs, a back and front revolute joint, and lower leg. The machine has 10 lidar sensors that can detect the distance to objects in front of it. 

You will be given scenarios in the form of observations from the environment and the reward based on the last action and the Current Total Score. Your goal is to provide the next action in the following format:

'Move Back revolute joint `value`, Back lower leg 'value', Front revolute joint 'value', Front lower leg 'value''

The values must be in the range [-1, 1], and your goal is to maximize the reward and move the machine forward effectively. Only provide the action and no additional explanation.

Example
'Observation from last step: 
Hull angle: -0.02
Angular veloc